# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
"""
Core System Class for MDAPY
===========================
This module provides the central System class that serves as the main interface
for atomic simulation data and analysis in mdapy. The System class integrates
all analysis modules and provides a unified API for structural analysis,
property calculations, and data manipulation.
Classes
-------
System : Core class representing an atomic system
"""
from mdapy.box import Box
from mdapy.load_save import BuildSystem, SaveSystem
from mdapy.knn import NearestNeighbor
from mdapy.neighbor import Neighbor
from mdapy.voronoi import Voronoi
from mdapy.structure_entropy import StructureEntropy
from mdapy.cluster_analysis import ClusterAnalysis
from mdapy.calculator import CalculatorMP
from mdapy.identify_diamond_structure import IdentifyDiamondStructure
from mdapy.polyhedral_template_matching import PolyhedralTemplateMatching
from mdapy.common_neighbor_analysis import CommonNeighborAnalysis
from mdapy.steinhardt_bond_orientation import SteinhardtBondOrientation
from mdapy.radial_distribution_function import RadialDistributionFunction
from mdapy.structure_factor import StructureFactor
from mdapy.centro_symmetry_parameter import CentroSymmetryParameter
from mdapy.warren_cowley_parameter import WarrenCowleyParameter
from mdapy.identify_fcc_planar_faults import IdentifyFccPlanarFaults
from mdapy.ackland_jones_analysis import AcklandJonesAnalysis
from mdapy.common_neighbor_parameter import CommonNeighborParameter
from mdapy.atomic_temperature import AtomicTemperature
from mdapy.bond_analysis import BondAnalysis
from mdapy.chill_plus import ChillPlus
from mdapy.build_bond import build_bond as _build_bond_pairs
from mdapy.angular_distribution_function import AngularDistributionFunction
import mdapy.tool_function as tool
from mdapy.data import atomic_numbers, vdw_radii
from typing import Optional, Dict, TYPE_CHECKING, Union, Iterable, Any, List, Tuple
import numpy as np
import polars as pl
if TYPE_CHECKING:
from ase import Atoms
from ovito.data import DataCollection
[docs]
class System:
"""
Core class representing an atomic system with integrated analysis capabilities.
The System class is the central data structure in mdapy, providing a unified
interface for loading, manipulating, and analyzing atomic configurations. It
integrates various structural analysis methods, property calculators, and
visualization tools.
Parameters
----------
filename : str, optional
Path to an atomic configuration file. Supports various formats including
POSCAR, LAMMPS data/dump, XYZ, mp etc. Format is auto-detected or can
be specified via the `format` parameter.
data : pl.DataFrame, optional
Polars DataFrame containing atomic information. Must include columns
'x', 'y', 'z' for positions. Other common columns include 'type',
'element', 'vx', 'vy', 'vz', 'fx', 'fy', 'fz', etc.
pos : np.ndarray, optional
Array of shape (N, 3) containing atomic positions. Used in conjunction
with `box` parameter.
box : int, float, Iterable[float], np.ndarray, or Box, optional
Simulation box specification. Required when using `data` or `pos` parameters.
See :class:`~mdapy.box.Box` for supported formats.
ase_atom : ase.Atoms, optional
ASE Atoms object to initialize the system from.
ovito_atom : ovito.data.DataCollection, optional
OVITO DataCollection object to initialize the system from.
format : str, optional
File format specification when loading from `filename`.
If None, format is auto-detected.
global_info : dict, optional
Dictionary containing global system properties such as total energy,
virial, stress tensor, timestep, etc. Defaults to empty dict.
Attributes
----------
verlet_list : np.ndarray, optional
Neighbor list array when neighbors are built.
distance_list : np.ndarray, optional
Distance list array when neighbors are built.
neighbor_number : np.ndarray, optional
Number of neighbors for each atom when neighbors are built.
rc : float, optional
Cutoff radius used for neighbor list construction.
Notes
-----
The System class uses lazy evaluation for many analyses - neighbor lists
and other expensive computations are only performed when needed. Results
are cached and reused when possible.
At initialization, exactly one of the following must be provided:
- `filename`
- `data` and `box`
- `pos` and `box`
- `ase_atom`
- `ovito_atom`
Examples
--------
Creating a system from different sources:
.. code-block:: python
from mdapy.system import System
import numpy as np
# Load from file (auto-detect format)
system = System("config.dump")
# Load from file with explicit format
system = System("POSCAR", format="poscar")
# Create from numpy array
pos = np.random.rand(100, 3) * 10
system = System(pos=pos, box=10)
# Create from polars DataFrame
import polars as pl
data = pl.DataFrame(
{
"x": np.random.rand(100),
"y": np.random.rand(100),
"z": np.random.rand(100),
"type": np.ones(100, dtype=int),
}
)
system = System(data=data, box=[10, 10, 10])
Performing structural analysis:
.. code-block:: python
# Identify crystal structures
system.cal_polyhedral_template_matching()
# Calculate radial distribution function
rdf = system.cal_radial_distribution_function(rc=10.0, nbin=200)
# Perform cluster analysis
system.cal_cluster_analysis(rc=3.0)
Accessing and modifying data:
.. code-block:: python
# Get positions
pos = system.data.select("x", "y", "z")
# Add a new column
new_data = system.data.with_columns(energy=np.random.rand(system.N))
system.update_data(new_data)
# Save to file
system.write("output.dump")
See Also
--------
mdapy.box.Box : Simulation box representation
mdapy.calculator.CalculatorMP : Abstract calculator interface
mdapy.load_save.BuildSystem : System construction utilities
mdapy.load_save.SaveSystem : System export utilities
"""
def __init__(
self,
filename: Optional[str] = None,
data: Optional[pl.DataFrame] = None,
pos: Optional[np.ndarray] = None,
box: Union[int, float, Iterable[float], np.ndarray, Box] = None,
ase_atom: Optional["Atoms"] = None,
ovito_atom: Optional["DataCollection"] = None,
format: Optional[str] = None,
global_info: Optional[Dict[str, Any]] = None,
):
"""Initialize a System object from various data sources."""
self.__global_info = {}
if isinstance(filename, str):
self.__data, self.box, self.__global_info = BuildSystem.from_file(
filename, format
)
elif data is not None and box is not None:
self.__data, self.box = BuildSystem.from_data(data, box)
elif pos is not None and box is not None:
self.__data, self.box = BuildSystem.from_array(pos, box)
elif ase_atom is not None:
self.__data, self.box = BuildSystem.from_ase(ase_atom)
elif ovito_atom is not None:
self.__data, self.box, self.__global_info = BuildSystem.from_ovito(
ovito_atom
)
else:
raise RuntimeError(
"One must at least provide filename or [data, box] or [pos, box] or ase_atom or ovito_atom."
)
# Make sure the initial DataFrame is single-chunked. The various
# BuildSystem readers are inconsistent on this point and a multi-
# chunk frame would later break `Series.to_numpy(allow_copy=False)`.
self.__data = self.__data.rechunk()
if not len(self.__global_info) and global_info is not None:
self.__global_info = dict(global_info)
self.__calc: Optional[CalculatorMP] = None
@property
def box(self) -> Box:
"""The simulation box.
Assigning to this attribute (``system.box = ...``) is equivalent to
calling :meth:`update_box` *without* ``scale_pos`` — i.e. atoms keep
their Cartesian coordinates while the cell vectors change. Any
cached neighbor / bond / Voronoi data is invalidated, since indices
and distances depend on the box.
"""
return self.__box
@box.setter
def box(self, value: Union[int, float, Iterable[float], np.ndarray, Box]):
new_box = value if isinstance(value, Box) else Box(value)
self.__box = new_box
# Invalidate caches that depend on the box. During __init__ none of
# these attrs exist yet, so the hasattr guards make this a no-op.
if isinstance(getattr(self, "_System__calc", None), CalculatorMP):
self.__calc.results = {}
for attr in ("verlet_list", "neighbor_number", "distance_list", "rc",
"bond", "voro_verlet_list", "voro_distance_list",
"voro_face_area", "voro_neighbor_number",
"_enlarge_box", "_enlarge_data"):
if hasattr(self, attr):
delattr(self, attr)
@property
def calc(self) -> Optional[CalculatorMP]:
return self.__calc
@calc.setter
def calc(self, value):
if not isinstance(value, CalculatorMP):
raise TypeError(
f"calc must be CalculatorMP, instead of {type(value).__name__}"
)
value.results = {}
self.__calc = value
@property
def global_info(self) -> Dict[str, Any]:
"""
Obtain global system information.
Returns
-------
dict
Dictionary containing global properties such as total energy,
virial tensor, stress tensor, timestep, temperature, etc.
Keys and values depend on the data source and attached calculator.
"""
return self.__global_info
@property
def data(self) -> pl.DataFrame:
"""
Access the atomic data DataFrame.
Returns
-------
pl.DataFrame
Polars DataFrame containing per-atom properties. Always includes
columns 'x', 'y', 'z' for Cartesian coordinates. May include
additional columns such as 'type', 'element', 'vx', 'vy', 'vz'
(velocities), 'fx', 'fy', 'fz' (forces), and various computed
properties from analysis methods.
Examples
--------
>>> system = System("config.dump")
>>> print(system.data.columns)
['x', 'y', 'z', 'type', 'vx', 'vy', 'vz']
"""
return self.__data
@property
def N(self) -> int:
"""
Get the number of atoms in the system.
Returns
-------
int
Total number of atoms/particles in the system.
Examples
--------
>>> system = System(pos=np.random.rand(100, 3), box=10)
>>> print(system.N)
100
"""
return self.__data.shape[0]
def __repr__(self) -> str:
"""
Return string representation of the System.
Returns
-------
str
Multi-line string containing atom number, box information,
global properties, and a summary of the data DataFrame.
"""
if not self.__global_info:
return f"Atom Number: {self.N}\n{self.box}\nParticle Information:\n{self.__data}"
else:
info = f"Atom Number: {self.N}\n{self.box}\n"
for key in self.__global_info.keys():
info += f"{key}: {self.__global_info[key]}\n"
info += f"Particle Information:\n{self.__data}"
return info
[docs]
def set_element(self, element: Union[str, List[str], Tuple[str], np.ndarray]):
"""
Set element names for atoms in the system.
Parameters
----------
element : str, list of str, tuple of str, or np.ndarray
Element specification:
* **str**: Assigns the same element to all atoms
* **list/tuple/array**: Must have length equal to N, specifying
element for each atom individually
Examples
--------
Assign same element to all atoms:
>>> system.set_element("Cu")
Assign different elements:
>>> elements = ["Cu"] * 50 + ["Al"] * 50
>>> system.set_element(elements)
Notes
-----
This method updates the 'element' column in the data DataFrame.
If an 'element' column exists, it is replaced.
"""
if isinstance(element, str):
df = self.__data.with_columns(pl.lit(element).alias("element"))
else:
assert len(element) == self.N, (
f"Length of element ({len(element)}) must equal the atom "
f"number ({self.N})."
)
if "element" in self.data.columns:
df = self.__data.select(pl.all().exclude("element")).with_columns(
pl.lit(np.array(element)).alias("element")
)
else:
df = self.__data.with_columns(
pl.lit(np.array(element)).alias("element")
)
self.update_data(df, True)
[docs]
def set_type_by_element(
self, element_list: Union[List[str], Tuple[str], np.ndarray]
):
"""
Set atom types based on element ordering.
Parameters
----------
element_list : list of str, tuple of str, or np.ndarray
Ordered list of unique elements. Atoms are assigned type numbers
based on the index of their element in this list (starting from 1).
Raises
------
AssertionError
If data does not contain 'element' column or if any element in
data is not present in the provided element list.
Examples
--------
>>> system.set_element(["Cu", "Cu", "Al", "Al"])
>>> system.set_type_by_element(["Cu", "Al"])
>>> print(system.data.select("element", "type"))
┌─────────┬──────┐
│ element │ type │
├─────────┼──────┤
│ Cu │ 1 │
│ Cu │ 1 │
│ Al │ 2 │
│ Al │ 2 │
└─────────┴──────┘
Notes
-----
This method creates or updates the 'type' column in the data DataFrame.
Type numbers start from 1 and correspond to the order in the element list.
"""
assert "element" in self.data.columns, "Data must contain element column."
for i in self.data["element"].unique():
assert i in element_list, (
f"element_list must include element {i!r} "
"(seen in data['element'])."
)
ele2type = {j: i for i, j in enumerate(element_list, start=1)}
self.update_data(
self.data.with_columns(
pl.col("element")
.replace_strict(ele2type, return_dtype=pl.Int32)
.rechunk()
.alias("type")
),
True,
)
[docs]
def get_positions(self, reduced: bool = False) -> pl.DataFrame:
"""
Extract atomic positions from the system.
Parameters
----------
reduced : bool, optional
If True, return fractional (reduced) coordinates in the box basis.
If False (default), return Cartesian coordinates.
Returns
-------
pl.DataFrame
DataFrame containing position columns:
* **reduced=False**: Columns 'x', 'y', 'z' (Cartesian coordinates)
* **reduced=True**: Columns 'r_x', 'r_y', 'r_z' (fractional coordinates)
Examples
--------
>>> system = System("POSCAR")
>>> cart_pos = system.get_positions(reduced=False)
>>> frac_pos = system.get_positions(reduced=True)
Notes
-----
Fractional coordinates are computed using the inverse box matrix:
reduced = positions @ box.inverse_box
"""
if reduced:
inv = self.box.inverse_box
# pos @ inv
return self.data.select(
r_x=pl.col("x") * inv[0, 0]
+ pl.col("y") * inv[1, 0]
+ pl.col("z") * inv[2, 0],
r_y=pl.col("x") * inv[0, 1]
+ pl.col("y") * inv[1, 1]
+ pl.col("z") * inv[2, 1],
r_z=pl.col("x") * inv[0, 2]
+ pl.col("y") * inv[1, 2]
+ pl.col("z") * inv[2, 2],
)
else:
return self.data.select("x", "y", "z")
[docs]
def get_velocities(self) -> pl.DataFrame:
"""
Extract atomic velocities from the system.
Returns
-------
pl.DataFrame
DataFrame with columns 'vx', 'vy', 'vz' containing velocity
components for each atom.
Raises
------
AssertionError
If the data does not contain velocity columns ('vx', 'vy', 'vz').
Examples
--------
>>> system = System("relax.dump") # File with velocities
>>> velocities = system.get_velocities()
"""
for i in ["vx", "vy", "vz"]:
assert i in self.data.columns
return self.data.select("vx", "vy", "vz")
[docs]
def set_pka(
self,
energy: float,
direction: np.ndarray,
index: Optional[int] = None,
element: Optional[str] = None,
factor: float = 1.0,
):
"""
Set primary knock-on atom (PKA) for radiation damage simulation.
This method assigns initial kinetic energy and direction to a PKA atom,
commonly used to initialize cascade simulations in radiation damage studies.
We assume that the velocity in the units of A/fs, you need to use `factor` to convert your velocity to this units.
Parameters
----------
energy : float
Kinetic energy to assign to the PKA (in eV).
direction : np.ndarray
1D array of length 3 specifying the velocity direction vector.
Will be normalized automatically.
index : int, optional
Index of the atom to set as PKA. If None, selects by nearest center atom.
element : str, optional
Element name to select PKA. If provided and index is None,
selects by nearest center atom with same element.
factor : float
Convert your velocity to the units of A/fs.
Defaults to 1.0.
Notes
-----
This method updates the velocity columns ('vx', 'vy', 'vz') in the
data DataFrame.
Examples
--------
Set PKA by atom index:
>>> system.set_pka(energy=1000.0, direction=np.array([1, 3, 5]), index=50)
Set PKA by element:
>>> system.set_pka(energy=2000.0, direction=np.array([1, 3, 5]), element="Fe")
"""
self.update_data(
tool._set_pka(
self.data.with_columns(
pl.col("vx") * factor, pl.col("vy") * factor, pl.col("vz") * factor
),
self.box,
energy,
direction,
index,
element,
).with_columns(
pl.col("vx") / factor, pl.col("vy") / factor, pl.col("vz") / factor
)
)
[docs]
def get_energy(self) -> float:
"""
Calculate total potential energy of the system.
Returns
-------
float
Total potential energy in the units eV.
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> total_energy = system.get_energy()
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_energy(self.data, self.box)
[docs]
def get_energies(self) -> np.ndarray:
"""
Calculate per-atom potential energies in the units eV.
Returns
-------
np.ndarray
1D array of shape (N,) containing potential energy for each atom.
Raises
------
AssertionError
If no calculator is attached to the system.
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_energies(self.data, self.box)
[docs]
def get_force(self) -> np.ndarray:
"""
Calculate forces acting on each atom in the units eV/A.
Returns
-------
np.ndarray
2D array of shape (N, 3) containing force components [fx, fy, fz]
for each atom.
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> forces = system.get_force()
>>> print(forces.shape)
(1000, 3)
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_forces(self.data, self.box)
[docs]
def get_stress(self) -> np.ndarray:
"""
Calculate stress tensor of the system in the units eV/A^3.
Returns
-------
np.ndarray
1D array of shape (6,) containing stress tensor components in
Voigt notation: [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy].
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> stress = system.get_stress()
>>> print(stress) # [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy]
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_stress(self.data, self.box)
[docs]
def get_virials(self) -> np.ndarray:
"""
Calculate per-atom virial tensors in the units eV.
Returns
-------
np.ndarray
2D array of shape (N, 9) containing virial tensor components for
each atom, ordered as: [v_xx, v_xy, v_xz, v_yx, v_yy, v_yz, v_zx, v_zy, v_zz].
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> virials = system.get_virials()
>>> print(virials.shape)
(1000, 9)
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_virials(self.data, self.box)
[docs]
def update_data(
self,
data: pl.DataFrame,
reset_calculator: bool = False,
reset_neighbor: bool = False,
reset_calcolator: Optional[bool] = None,
) -> None:
"""
Update the atomic data DataFrame.
Parameters
----------
data : pl.DataFrame
New DataFrame containing updated atomic information.
Should maintain the same structure (columns) as the original data.
reset_calculator : bool, optional
If True, clears cached results from the attached calculator.
Set to True when atomic positions or types/element change.
Default is False.
reset_neighbor : bool, optional
If True, clear all neighbor information.
Set to True when atomic positions change. Default is False.
reset_calcolator : bool, optional
*Deprecated.* Misspelled alias for ``reset_calculator`` kept
for backwards compatibility; emits a ``DeprecationWarning``
and will be removed in a future release.
Examples
--------
>>> # Add a new property column
>>> new_data = system.data.with_columns(
... temperature=np.random.rand(system.N) * 300
... )
>>> system.update_data(new_data)
>>> # Update positions (reset calculator)
>>> new_data = system.data.with_columns(x=system.data["x"] + 0.1)
>>> system.update_data(new_data, reset_calculator=True)
Notes
-----
When ``reset_calculator=True``, any cached energy, force, or stress
calculations are invalidated and will be recomputed on next access.
The DataFrame is rechunked into a single contiguous chunk on write.
Polars operations such as ``filter()`` or ``concat()`` can leave a
DataFrame with multiple internal chunks; downstream code that calls
``Series.to_numpy(allow_copy=False)`` then raises. Rechunking once at
the assignment boundary makes every consumer safe.
"""
if reset_calcolator is not None:
import warnings
warnings.warn(
"`reset_calcolator` is a misspelling and is deprecated; "
"use `reset_calculator` instead.",
DeprecationWarning,
stacklevel=2,
)
reset_calculator = reset_calculator or reset_calcolator
self.__data = data.rechunk()
if reset_calculator and isinstance(self.calc, CalculatorMP):
self.calc.results = {}
if reset_neighbor:
if hasattr(self, "verlet_list"):
del self.verlet_list, self.neighbor_number, self.distance_list
if hasattr(self, "rc"):
del self.rc
if hasattr(self, "bond"):
del self.bond
if hasattr(self, "voro_verlet_list"):
del (
self.voro_verlet_list,
self.voro_distance_list,
self.voro_face_area,
self.voro_neighbor_number,
)
if hasattr(self, "_enlarge_box"):
del self._enlarge_box, self._enlarge_data
def _get_compute_view(self) -> Tuple[Box, pl.DataFrame]:
"""Return ``(box, data)`` to feed to per-atom analysis kernels.
When :meth:`build_neighbor` (or :meth:`build_voronoi_neighbor` /
:meth:`build_nearest_neighbor`) detects a system whose box is smaller
than required, it transparently builds a replicated copy and stores
it as ``self._enlarge_box`` / ``self._enlarge_data``. Analysis
methods must then operate on the *enlarged* view (so neighbour
indices align), and the caller is responsible for slicing the
result back to ``self.N``.
Returns
-------
Tuple[Box, pl.DataFrame]
The enlarged view if one exists, otherwise the system's own box
and DataFrame.
"""
if hasattr(self, "_enlarge_data"):
return self._enlarge_box, self._enlarge_data
return self.box, self.data
[docs]
def update_box(
self,
box: Union[int, float, Iterable[float], np.ndarray, Box],
scale_pos: bool = False,
) -> None:
"""
Update the simulation box.
Parameters
----------
box : int, float, Iterable[float], np.ndarray, or Box
New box specification. See :class:`~mdapy.box.Box` for accepted formats.
scale_pos : bool, optional
If True, scales atomic positions affinely with the box change.
Useful for applying strain or changing lattice constants while
maintaining fractional coordinates. Default is False.
Raises
------
AssertionError
If `scale_pos=True` but not all boundaries are periodic.
Examples
--------
Change box without scaling positions:
>>> system.update_box([20, 20, 20])
Apply uniform strain by scaling box and positions:
>>> system.update_box([11, 11, 11], scale_pos=True)
Notes
-----
When `scale_pos=True`, the transformation is:
new_pos = old_pos @ (old_box^-1 @ new_box)
This preserves fractional coordinates in the new box.
"""
new_box = Box(box)
if scale_pos:
assert sum(new_box.boundary) == 3, (
"only support all periodic boundary condition."
)
map_matrix = np.linalg.solve(self.box.box, new_box.box)
self.__data = self.__data.with_columns(
pl.col("x") - self.box.origin[0],
pl.col("y") - self.box.origin[1],
pl.col("z") - self.box.origin[2],
).with_columns(
x=pl.col("x") * map_matrix[0, 0]
+ pl.col("y") * map_matrix[1, 0]
+ pl.col("z") * map_matrix[2, 0]
+ new_box.origin[0],
y=pl.col("x") * map_matrix[0, 1]
+ pl.col("y") * map_matrix[1, 1]
+ pl.col("z") * map_matrix[2, 1]
+ new_box.origin[1],
z=pl.col("x") * map_matrix[0, 2]
+ pl.col("y") * map_matrix[1, 2]
+ pl.col("z") * map_matrix[2, 2]
+ new_box.origin[2],
)
# The `box` setter handles all neighbor / calculator cache
# invalidation (see the property definition at the top of the class).
self.box = new_box
[docs]
def wrap_pos(self) -> None:
"""Wrap positions into box for PBC boundary."""
self.update_data(tool.wrap_pos(self.data, self.box), True, True)
[docs]
def replicate(self, nx: int, ny: int, nz: int) -> None:
"""
Replicate the system by creating a supercell.
Parameters
----------
nx : int
Number of replications along x-direction.
ny : int
Number of replications along y-direction.
nz : int
Number of replications along z-direction.
Examples
--------
Create a 2x2x2 supercell:
>>> system = System("unit_cell.poscar")
>>> system.replicate(2, 2, 2)
>>> print(system.N) # 8 times the original number
Notes
-----
This method modifies the system in place. The box is scaled by
[nx, ny, nz] and atoms are replicated accordingly.
"""
assert nx > 0
assert ny > 0
assert nz > 0
data, box = tool.replicate(self.__data, self.box, int(nx), int(ny), int(nz))
self.update_data(data, True, True)
self.update_box(box)
[docs]
def to_ovito(self) -> "DataCollection":
"""
Convert system to OVITO DataCollection object.
Returns
-------
ovito.data.DataCollection
OVITO data structure containing the atomic configuration.
Raises
------
ImportError
If OVITO is not installed.
Examples
--------
>>> data = system.to_ovito()
>>> # Use OVITO's analysis or visualization tools
>>> from ovito.modifiers import CommonNeighborAnalysisModifier
>>> data.apply(CommonNeighborAnalysisModifier())
"""
return SaveSystem.to_ovito(self.__data, self.box, self.__global_info)
[docs]
def to_ase(self) -> "Atoms":
"""
Convert system to ASE Atoms object.
Returns
-------
ase.Atoms
ASE Atoms object containing the atomic configuration.
Raises
------
ImportError
If ASE is not installed.
AssertionError
If data does not contain an 'element' column.
Examples
--------
>>> atoms = system.to_ase()
>>> # Use ASE's functionality
>>> from ase.io import write
>>> write("output.xyz", atoms)
"""
return SaveSystem.to_ase(self.__data, self.box)
[docs]
def write_mp(self, output_name: str) -> None:
"""
Save system to mdapy `.mp` (parquet-based) format.
Parameters
----------
output_name : str
Output file path. The `.mp` extension is recommended.
Notes
-----
The `.mp` format round-trips `box`, the full per-atom DataFrame, and
`global_info` (including non-numeric metadata) without loss.
Examples
--------
>>> system.write_mp("config.mp")
"""
SaveSystem.write_mp(output_name, self.data, self.box, self.global_info)
[docs]
def write_xyz(self, output_name: str, classical: bool = False, compress=False):
"""
Save the system to XYZ format.
Parameters
----------
output_name : str
Output file path. ``.xyz`` (or ``.xyz.gz`` with ``compress=True``)
is the conventional extension.
classical : bool, optional
If True, write the legacy 4-column XYZ ``element x y z`` form
(no Properties / Lattice metadata). Defaults to False, i.e.
extended XYZ that round-trips box, velocities, forces, etc.
compress : bool, optional
If True, gzip the output (``.gz`` is appended if missing).
Defaults to False.
Notes
-----
The system's ``global_info`` is forwarded to the XYZ comment line as
extra metadata keys. Existing keys (``Lattice``, ``Properties``,
``pbc``) are reserved by the writer and must not appear in
``global_info``.
"""
SaveSystem.write_xyz(
output_name, self.box, self.data, classical, compress, **self.global_info
)
[docs]
def write_poscar(
self,
output_name: str,
reduced_pos: bool = False,
selective_dynamics: bool = False,
compress=False,
):
"""
Save the system to a VASP POSCAR file.
Parameters
----------
output_name : str
Output file path.
reduced_pos : bool, optional
If True, write fractional ``Direct`` coordinates instead of
Cartesian. Defaults to False.
selective_dynamics : bool, optional
If True, append a ``Selective dynamics`` block. The data must
contain ``sdx``, ``sdy``, ``sdz`` boolean-flag columns.
Defaults to False.
compress : bool, optional
If True, gzip the output. Defaults to False.
Notes
-----
Atoms are grouped by element in ``self.data``'s row order; if you
need a specific element ordering, sort the DataFrame first.
"""
SaveSystem.write_poscar(
output_name,
self.box,
self.data,
reduced_pos=reduced_pos,
selective_dynamics=selective_dynamics,
compress=compress,
)
[docs]
def write_data(
self,
output_name: str,
element_list: Optional[List[str]] = None,
num_type: Optional[int] = None,
data_format: str = "atomic",
compress=False,
):
"""
Save the system to a LAMMPS data file.
Parameters
----------
output_name : str
Output file path.
element_list : list of str, optional
Element ordering for the LAMMPS ``Masses`` section. When the
system has an ``element`` column, the LAMMPS ``type`` column is
derived locally from this ordering (the System's own DataFrame
is *not* mutated by this call). Element names not present in
the data raise ``AssertionError``. If the data already has a
``type`` column and no ``element`` column, the existing types
are used and ``element_list`` is recorded only in the
``Masses`` block.
num_type : int, optional
Total number of atom types declared in the file. Defaults to
``data['type'].max()``. Must be at least that maximum.
data_format : str, optional
``'atomic'`` (id type x y z) or ``'charge'`` (id type q x y z).
Defaults to ``'atomic'``.
compress : bool, optional
If True, gzip the output. Defaults to False.
"""
# When element_list is provided AND the data has an element column,
# derive the LAMMPS `type` column from the element ordering — but on
# a *local copy* so the System's own DataFrame is not mutated by a
# write call.
data = self.data
if element_list is not None and "element" in data.columns:
for ele in data["element"].unique():
assert ele in element_list, (
f"element_list must include element {ele!r}."
)
ele2type = {j: i for i, j in enumerate(element_list, start=1)}
data = data.with_columns(
pl.col("element")
.replace_strict(ele2type, return_dtype=pl.Int32)
.rechunk()
.alias("type")
)
SaveSystem.write_data(
output_name,
self.box,
data,
element_list=element_list,
num_type=num_type,
data_format=data_format,
compress=compress,
)
[docs]
def write_dump(self, output_name: str, timestep: float = 0, compress=False):
"""
Save the system to a single-frame LAMMPS dump file.
Parameters
----------
output_name : str
Output file path. ``.dump`` / ``.lammpstrj`` are conventional.
timestep : float, optional
Value written into the ``ITEM: TIMESTEP`` header. Defaults to 0.
compress : bool, optional
If True, gzip the output. Defaults to False.
Notes
-----
For multi-frame trajectories use :class:`mdapy.Trajectory` instead.
Numeric columns plus the ``element`` string column are preserved;
other non-numeric columns are dropped.
"""
SaveSystem.write_dump(
output_name, self.box, self.data, timestep=timestep, compress=compress
)
[docs]
def build_neighbor(
self,
rc: float,
max_neigh: Optional[int] = None,
) -> None:
"""
Build neighbor list for the system.
This method constructs a Verlet neighbor list within a cutoff radius,
which is required for many analysis methods. For small systems, the
box may be automatically replicated to ensure sufficient neighbors.
Parameters
----------
rc : float
Cutoff radius for neighbor search (in simulation units).
max_neigh : int, optional
Maximum number of neighbors per atom. If None, automatically
determined based on the actual neighbor number Increase this if you get warnings
about insufficient neighbor slots.
Attributes Set
--------------
verlet_list : np.ndarray
2D array of shape (N, max_neigh) containing neighbor indices.
Unfilled slots contain -1.
distance_list : np.ndarray
2D array of shape (N, max_neigh) containing distances to neighbors.
neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
rc : float
The cutoff radius used.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_neighbor(rc=5.0)
>>> print(system.verlet_list.shape)
(1000, 50) # 1000 atoms, up to 50 neighbors each
>>> print(system.neighbor_number)
[12 12 12 ... 12 12 12] # FCC coordination
See :class:`~mdapy.neighbor.Neighbor` for implementation details.
"""
neigh = Neighbor(rc, self.box, self.data, max_neigh)
neigh.compute()
self.rc = rc
if hasattr(neigh, "_enlarge_box"):
self._enlarge_box: Box = neigh._enlarge_box
if hasattr(neigh, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = neigh._enlarge_data
self.verlet_list, self.distance_list, self.neighbor_number = (
neigh.verlet_list,
neigh.distance_list,
neigh.neighbor_number,
)
[docs]
def build_voronoi_neighbor(
self, a_face_area_threshold: float = -1.0, r_face_area_threshold: float = -1.0
) -> None:
"""
Calculate Voronoi neighbors and face properties for all atoms.
This method performs Voronoi tessellation to identify nearest neighbors
based on shared Voronoi cell faces. It can filter neighbors based on
face area thresholds to exclude insignificant contacts.
Parameters
----------
a_face_area_threshold : float, optional
Absolute face area threshold.
Faces with area below this value are ignored as neighbors.
Default is -1.0 (no filtering).
r_face_area_threshold : float, optional
Relative face area threshold (fraction of average face area).
Faces with relative area below this value are ignored.
Default is -1.0 (no filtering).
Attributes Set
--------------
voro_verlet_list : np.ndarray
2D array of shape (N, max_neigh) containing neighbor indices.
Unfilled slots contain -1.
voro_distance_list : np.ndarray
2D array of shape (N, max_neigh) containing distances to neighbors.
voro_neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
voro_face_area : np.ndarray
2D array of shape (N, max_neighbors) containing the area of
the Voronoi face shared with each neighbor.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_voronoi_neighbor()
>>> print(system.voro_verlet_list.shape)
(1000, 50) # 1000 atoms, up to 50 neighbors each
See :class:`~mdapy.voronoi.Voronoi` for implementation details.
"""
vor = Voronoi(self.box, self.data)
(
self.voro_verlet_list,
self.voro_distance_list,
self.voro_face_area,
self.voro_neighbor_number,
) = vor.get_neighbor(a_face_area_threshold, r_face_area_threshold)
if hasattr(vor, "_enlarge_box"):
self._enlarge_box: Box = vor._enlarge_box
if hasattr(vor, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = vor._enlarge_data
[docs]
def build_nearest_neighbor(self, k: int) -> None:
"""
Calculate `k` nearest neighbor information.
Parameters
----------
k : int
Maximum number of neighbors to find per atom.
Attributes Set
--------------
verlet_list : np.ndarray
2D array of shape (N, k) containing neighbor indices.
distance_list : np.ndarray
2D array of shape (N, k) containing distances to neighbors.
neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_nearest_neighbor(12)
>>> print(system.verlet_list.shape)
(1000, 12) # 1000 atoms, up to 12 sorted neighbors each
See :class:`~mdapy.knn.NearestNeighbor` for implementation details.
"""
kdt = NearestNeighbor(self.data, self.box, k)
kdt.compute()
if hasattr(kdt, "_enlarge_box"):
self._enlarge_box: Box = kdt._enlarge_box
if hasattr(kdt, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = kdt._enlarge_data
self.verlet_list, self.distance_list = kdt.indices_py, kdt.distances_py
self.neighbor_number = np.full(self.verlet_list.shape[0], k, np.int32)
@staticmethod
def _normalize_bond_cutoff(
rc: Union[float, Dict[Tuple[Union[int, str], Union[int, str]], float], np.ndarray],
data: pl.DataFrame,
) -> Tuple[float, np.ndarray, np.ndarray]:
if np.isscalar(rc):
rc = float(rc)
assert rc > 0, "rc should be larger than 0."
return rc, np.zeros(data.shape[0], np.int32), np.array([[rc]], float)
if isinstance(rc, dict):
assert len(rc) > 0, "pairwise rc should not be empty."
first_pair = next(iter(rc))
assert len(first_pair) == 2, "pairwise rc key should have two entries."
first_value = first_pair[0]
if isinstance(first_value, str):
assert "element" in data.columns, (
"Data must contain element column for element-pair bond cutoff."
)
labels = data["element"].to_numpy()
unique_labels = np.unique(labels)
else:
assert "type" in data.columns, (
"Data must contain type column for type-pair bond cutoff."
)
labels = data["type"].to_numpy(allow_copy=False)
unique_labels = np.unique(labels)
compact_type = np.searchsorted(unique_labels, labels).astype(np.int32)
ntype = unique_labels.shape[0]
cutoff_matrix = np.full((ntype, ntype), -1.0, float)
label_to_index = {value: idx for idx, value in enumerate(unique_labels.tolist())}
for pair, cutoff in rc.items():
assert len(pair) == 2, "pairwise rc key should have two type indices."
i, j = pair[0], pair[1]
assert i in label_to_index and j in label_to_index, (
f"type/element pair {(i, j)} is not in current system."
)
cutoff = float(cutoff)
assert cutoff > 0, "pairwise rc should be larger than 0."
ii, jj = label_to_index[i], label_to_index[j]
cutoff_matrix[ii, jj] = cutoff
cutoff_matrix[jj, ii] = cutoff
if np.any(cutoff_matrix < 0):
missing_pairs = []
for i in range(ntype):
for j in range(i, ntype):
if cutoff_matrix[i, j] < 0:
missing_pairs.append((unique_labels[i], unique_labels[j]))
raise ValueError(f"Missing rc for type pairs: {missing_pairs}")
else:
assert "type" in data.columns, (
"Data must contain type column for matrix bond cutoff."
)
type_list = data["type"].to_numpy(allow_copy=False)
unique_labels = np.unique(type_list)
compact_type = np.searchsorted(unique_labels, type_list).astype(np.int32)
ntype = unique_labels.shape[0]
cutoff_matrix = np.asarray(rc, float)
assert cutoff_matrix.ndim == 2, "pairwise rc should be a 2D array."
assert cutoff_matrix.shape == (ntype, ntype), (
f"pairwise rc shape should be {(ntype, ntype)}."
)
assert np.all(cutoff_matrix > 0), "pairwise rc should be larger than 0."
return float(cutoff_matrix.max()), compact_type, cutoff_matrix
[docs]
def create_bonds(
self,
rc: Union[
float,
Dict[Tuple[Union[int, str], Union[int, str]], float],
np.ndarray,
],
max_neigh: Optional[int] = None,
) -> np.ndarray:
"""
Build bond pairs from neighbor information.
Parameters
----------
rc : float, dict, or np.ndarray
Bond cutoff definition. If a float is given, all bonds use the same
cutoff. If a dict is given, it should map ``(type_i, type_j)`` or
``(element_i, element_j)`` to the corresponding cutoff. If a 2D
array is given, its rows and columns follow the sorted unique atom
types in the system.
max_neigh : int, optional
Maximum number of neighbors per atom when building the neighbor list.
Returns
-------
np.ndarray
A 2D integer array of shape ``(Nbond, 2)`` containing 0-based atom
indices for each bond pair.
Notes
-----
This method stores the result in ``self.bond`` and also returns it.
"""
if np.isscalar(rc):
max_rc = float(rc)
else:
if isinstance(rc, dict):
max_rc = float(np.max(np.asarray(list(rc.values()), float)))
else:
assert "type" in self.data.columns, (
"Data must contain type column for matrix bond cutoff."
)
max_rc = float(np.max(np.asarray(rc, float)))
assert max_rc > 0, "rc should be larger than 0."
has_neigh = False
if hasattr(self, "rc") and self.rc >= max_rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(max_rc, max_neigh)
_, data = self._get_compute_view()
_, compact_type, cutoff_matrix = self._normalize_bond_cutoff(rc, data)
bond = np.asarray(
_build_bond_pairs(
self.verlet_list,
self.distance_list,
self.neighbor_number,
compact_type,
cutoff_matrix,
),
dtype=np.int32,
)
if bond.size == 0:
self.bond = np.empty((0, 2), np.int32)
return self.bond
if hasattr(self, "_enlarge_data"):
bond %= self.N
bond.sort(axis=1)
bond = bond[bond[:, 0] != bond[:, 1]]
if bond.size == 0:
self.bond = np.empty((0, 2), np.int32)
return self.bond
self.bond = np.unique(bond, axis=0)
return self.bond
[docs]
def delete_overlap(self, rc: float, max_neigh: Optional[int] = None) -> int:
"""
Delete overlapping atoms that are closer than ``rc`` to another atom.
For every pair of atoms within ``rc`` (respecting periodic
boundary conditions), the atom with the **larger** index is
removed and the one with the smaller index is kept. An atom that
has already been removed cannot itself trigger further removals,
so each atom is deleted at most once and clusters of mutually
overlapping atoms collapse to their single lowest-index member.
Parameters
----------
rc : float
Distance threshold. Atom pairs separated by less than ``rc``
are considered overlapping.
max_neigh : int, optional
Maximum number of neighbors per atom when building the
neighbor list. If None, the buffer is sized automatically.
Returns
-------
int
The number of atoms removed.
Notes
-----
This method modifies ``self.data`` in place; neighbor lists and
cached calculator results are reset because atom indices change.
Examples
--------
>>> n_removed = system.delete_overlap(0.1)
>>> print(f"removed {n_removed} overlapping atoms")
"""
rc = float(rc)
assert rc > 0, "rc should be larger than 0."
N = self.N
has_neigh = hasattr(self, "rc") and self.rc >= rc
if not has_neigh:
self.build_neighbor(rc, max_neigh)
verlet = self.verlet_list
distance = self.distance_list
neigh_num = self.neighbor_number
# When the box was too small for `rc`, the neighbor list indexes
# into the enlarged (replicated) system; map those indices back
# to the original atoms. Cell 0 holds the originals at indices
# 0..N-1, so `enlarged_index % N` recovers the original atom.
verlet_orig = verlet % N if hasattr(self, "_enlarge_data") else verlet
# Sequential sweep in index order: atom j is removed iff it has a
# still-surviving neighbor with a smaller index. Because lower
# indices are finalized before we reach j, one pass suffices.
deleted = np.zeros(N, dtype=bool)
for j in range(N):
cnt = int(neigh_num[j])
if cnt == 0:
continue
nbr = verlet_orig[j, :cnt]
d = distance[j, :cnt]
smaller = nbr[(nbr < j) & (d < rc)]
if smaller.size and not deleted[smaller].all():
deleted[j] = True
n_removed = int(deleted.sum())
if n_removed == 0:
return 0
keep = ~deleted
self.update_data(
self.data.filter(keep),
reset_calculator=True,
reset_neighbor=True,
)
return n_removed
[docs]
def cal_identify_diamond_structure(self) -> None:
"""
Identify diamond structure atoms. This will generate 'ids' column in self.data:
- 0 Other
- 1 Cubic diamond
- 2 Cubic diamond (1st neighbor)
- 3 Cubic diamond (2nd neighbor)
- 4 Hexagonal diamond
- 5 Hexagonal diamond (1st neighbor)
- 6 Hexagonal diamond (2nd neighbor)
Notes
-----
See :class:`~mdapy.identify_diamond_structure.IdentifyDiamondStructure`
for implementation details.
"""
verlet_list = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= 4:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, 4
)
verlet_list = self.verlet_list
box, data = self._get_compute_view()
ids = IdentifyDiamondStructure(data, box, verlet_list)
ids.compute()
self.update_data(self.__data.with_columns(ids=ids.pattern[: self.N]))
[docs]
def cal_chill_plus(self, cutoff: float = 3.5) -> None:
"""
Run the CHILL+ water-phase identification on this system. The
result is written to ``self.data['chill_plus']``:
- 0 Other
- 1 Hexagonal ice
- 2 Cubic ice
- 3 Interfacial ice
- 4 Gas hydrate
- 5 Interfacial gas hydrate
The system must contain only the molecule centres (e.g. oxygen atoms
or coarse-grained water beads); strip hydrogens beforehand.
Parameters
----------
cutoff : float, optional
O-O cutoff radius in Å. Default 3.5.
Notes
-----
See :class:`~mdapy.chill_plus.ChillPlus` for implementation details.
"""
has_neigh = hasattr(self, "rc") and self.rc >= cutoff
if not has_neigh:
self.build_neighbor(cutoff)
box, data = self._get_compute_view()
cp = ChillPlus(
data,
box,
cutoff,
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
cp.compute()
self.update_data(
self.__data.with_columns(chill_plus=cp.pattern[: self.N])
)
[docs]
def cal_common_neighbor_parameter(
self, rc: float, max_neigh: Optional[int] = None
) -> None:
"""
Calculate common neighbor parameter for structure analysis. This will save results to self.data['cnp'].
Parameters
----------
rc : float
Cutoff radius for neighbor search.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.common_neighbor_parameter.CommonNeighborParameter`
for implementation details.
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
box, data = self._get_compute_view()
cnp = CommonNeighborParameter(
data, box, rc, self.verlet_list, self.distance_list, self.neighbor_number
)
cnp.compute()
self.update_data(self.__data.with_columns(cnp=cnp.cnp[: self.N]))
[docs]
def cal_ackland_jones_analysis(self) -> None:
"""
Perform Ackland-Jones structure analysis. This will save results to self.data['aja']:
- 0 = Other (unknown coordination)
- 1 = FCC (face-centered cubic)
- 2 = HCP (hexagonal close-packed)
- 3 = BCC (body-centered cubic)
- 4 = ICO (icosahedral coordination)
Notes
-----
See :class:`~mdapy.ackland_jones_analysis.AcklandJonesAnalysis`
for implementation details.
"""
N_neigh = 14
if self.data.shape[0] < N_neigh and sum(self.box.boundary) == 0:
self.update_data(self.__data.with_columns(pl.lit(0, pl.Int32).alias("aja")))
return
if hasattr(self, "neighbor_number") and self.neighbor_number.min() >= N_neigh:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, N_neigh
)
else:
self.build_nearest_neighbor(N_neigh)
box, data = self._get_compute_view()
aja = AcklandJonesAnalysis(data, box, self.verlet_list, self.distance_list)
aja.compute()
self.update_data(self.__data.with_columns(aja=aja.aja[: self.N]))
[docs]
def cal_warren_cowley_parameter(
self, rc: float, max_neigh: Optional[int] = None
) -> WarrenCowleyParameter:
"""
Calculate Warren-Cowley short-range order parameter.
Parameters
----------
rc : float
Cutoff radius for neighbor search.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
WarrenCowleyParameter
Object containing Warren-Cowley parameters.
Notes
-----
See :class:`~mdapy.warren_cowley_parameter.WarrenCowleyParameter`
for implementation details and returned attributes.
Examples
--------
>>> wcp = system.cal_warren_cowley_parameter(rc=3.0)
>>> print(wcp.wcp) # Access Warren-Cowley parameters
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
_, data = self._get_compute_view()
wcp = WarrenCowleyParameter(self.verlet_list, self.neighbor_number, data)
wcp.compute()
return wcp
[docs]
def cal_atomic_temperature(
self, rc: float, factor: float = 1.0, max_neigh: Optional[int] = None
) -> None:
"""
Calculate the local atomic temperature for each atom by analyzing the velocity fluctuations of the atom and its neighbors.
We assume velocity in units of Å/fs, you can use `factor` to convert your velocity.
This will save results to self.data['atomic_temp'] in the unit of K.
Parameters
----------
rc : float
Cutoff radius for local averaging.
factor : float
Scaling factor for velocities (e.g., for unit conversion).
Default is 1.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.atomic_temperature.AtomicTemperature` for
implementation details.
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
_, data = self._get_compute_view()
atomTemp = AtomicTemperature(
data, self.verlet_list, self.distance_list, rc, factor
)
atomTemp.compute()
self.update_data(self.data.with_columns(atomic_temp=atomTemp.T[: self.N]))
[docs]
def cal_steinhardt_bond_orientation(
self,
llist: Union[np.ndarray, List[int]],
use_voronoi: bool = False,
nnn: int = 0,
rc: float = -1.0,
average: bool = False,
use_weight: bool = False,
weight: Optional[np.ndarray] = None,
wl: bool = False,
wlhat: bool = False,
a_face_area_threshold: float = -1,
r_face_area_threshold: float = -1,
identify_liquid: bool = False,
threshold: float = 0.7,
n_bond: int = 7,
max_neigh: Optional[int] = None,
) -> None:
r"""Calculate Steinhardt bond orientation order parameters. One can also use it to identify the solid or liquid atom.
Parameters
----------
llist : np.ndarry or List[int]
Degrees list, such as [4, 6, 8], should be positive, even integer.
use_voronoi : bool
If True, compute using Voronoi neighbor. Default is False.
nnn : int
If `use_voronoi` is False and this value is postive, compute using `nnn` nearest neighbors. Default is 0.
rc : float
If `use_voronoi` is False and `nnn` is zero, make sure this parameter is postive, and use it to build neighbor. Default is -1.0.
average : bool
If True, qlm will be averaged. Default is False.
use_weight : bool
If True, the neighbor will be weighed by `weight` array, otherwise the weight of each neighbor is 1.0. Default is False.
weight : np.ndarray, optional
If it is not None, it should be has the same shape with the `self.verlet_list`.
If it is None and the `use_voronoi` is True, then the voronoi_face_area will be used as weight. Default is None.
wl : bool
If True, compute third-order invariant :math:`w_l` parameters. Default is False.
wlhat : bool
If True, compute normalized third-order invariant :math:`\hat{w}_l` parameters. Default is False.
a_face_area_threshold : float
Absolute face area threshold.
Faces with area below this value are ignored as neighbors.
Default is -1.0 (no filtering).
r_face_area_threshold : float
Relative face area threshold (fraction of average face area).
Faces with relative area below this value are ignored.
Default is -1.0 (no filtering).
identify_liquid : bool
Enable solid-liquid classification (requires :math:`l=6` in llist).
threshold : float
Threshold for solid-liquid identification (default: 0.7 for normalized :math:`q_6`).
n_bond : int
Minimum number of "solid-like" bonds for solid classification. Default to 7.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.steinhardt_bond_orientation.SteinhardtBondOrientation`
for implementation details. The results will add to self.data['ql{l}'] for l in `llist`.
If set `wl` is True, also will add self.data['wl{l}']. If set `wlhat` is True, also will add
self.data['wlh{l}']. If set `identify_liquid`, it will add self.data['solidliquid'] and self.data['nbond'] results.
"""
if use_voronoi:
self.build_voronoi_neighbor(a_face_area_threshold, r_face_area_threshold)
verlet_list, distance_list, neighbor_number = (
self.voro_verlet_list,
self.voro_distance_list,
self.voro_neighbor_number,
)
if use_weight and weight is None:
weight = self.voro_face_area
else:
if nnn > 0:
has_sort_neigh = False
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= nnn:
tool.sort_neighbor(
self.verlet_list,
self.distance_list,
self.neighbor_number,
nnn,
)
has_sort_neigh = True
if not has_sort_neigh:
self.build_nearest_neighbor(nnn)
else:
assert rc > 0, (
"At least use voronoi, or set positive nnn, or positive rc."
)
if hasattr(self, "rc"):
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
else:
self.build_neighbor(rc, max_neigh)
verlet_list, distance_list, neighbor_number = (
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
box, data = self._get_compute_view()
SBO = SteinhardtBondOrientation(
box,
data,
np.asarray(llist, int),
nnn,
rc,
average,
use_voronoi,
use_weight,
weight,
verlet_list,
distance_list,
neighbor_number,
wl,
wlhat,
identify_liquid,
threshold,
n_bond,
)
SBO.compute()
new_data = self.data
if SBO.qnarray.shape[1] > 1:
columns = [f"ql{i}" for i in llist]
if wl:
columns.extend(f"wl{i}" for i in llist)
if wlhat:
columns.extend(f"wlh{i}" for i in llist)
for i, name in enumerate(columns):
new_data = new_data.with_columns(
pl.lit(SBO.qnarray[: self.N, i]).alias(name)
)
else:
new_data = new_data.with_columns(
pl.lit(SBO.qnarray.flatten()[: self.N]).alias(f"ql{llist[0]}")
)
if identify_liquid:
new_data = new_data.with_columns(
pl.lit(SBO.solidliquid[: self.N]).alias("solidliquid"),
pl.lit(SBO.nbond[: self.N]).alias("nbond"),
)
self.update_data(new_data)
[docs]
def cal_polyhedral_template_matching(
self,
structure="fcc-hcp-bcc",
rmsd_threshold=0.1,
return_ordering=False,
return_rmsd=False,
return_atomic_distance=False,
return_orientation=False,
identify_fcc_planar_faults=False,
identify_esf=True,
):
"""
Identify crystal structures using polyhedral template matching, one can also further distiguish the planar defects in FCC structures.
For PTM, structure results will add as self.data['ptm'] and self.data['ordering']
- Structure type (integer, 0=Other, 1=FCC, 2=HCP, 3=BCC, 4=ICO, 5=SC, 6=DCUB, 7=DHEX, 8=Graphene)
- Ordering type (interger, 0=Other, 1=L10, 2=L12 (A-site), 3=L12 (B-site), 4=B2, 5=zincblende / wurtzite)
For planar defects, structure results will add as self.data['pft']
- 0: Non-hcp atoms (e.g. perfect fcc or disordered)
- 1: Indeterminate hcp-like (isolated hcp-like atoms, not forming a planar defect)
- 2: Intrinsic stacking fault (ISF, two adjacent hcp-like layers)
- 3: Coherent twin boundary (TB, one hcp-like layer)
- 4: Multi-layer stacking fault (three or more adjacent hcp-like layers)
- 5: Extrinsic Stacking Fault (ESF, if cal_esf=True)
Parameters
----------
structure : str
String specifying the structure types to consider, separated by hyphens (Defaults is "fcc-hcp-bcc").
Supported values: "fcc", "hcp", "bcc", "ico", "sc", "dcub", "dhex", "graphene".
Special values: "all" (all types), "default" (fcc, hcp, bcc).
rmsd_threshold : float
Maximum RMSD for a valid structure match. Particles exceeding this are classified as "Other".
Default is 0.1.
return_ordering : bool
If True, return the ordering type.
Default is False.
return_rmsd : bool
If True, return the RMSD value.
Default is False.
return_atomic_distance : bool
If True, return the interatomic distance.
Default is False.
return_orientation : bool
If True, return the orientation value.
Default is False.
identify_fcc_planar_faults : bool
If True, further detect the plannar defects.
Default is False.
identify_esf : bool
If True, further defect the ESF.
Default is True.
Notes
-----
See :class:`~mdapy.polyhedral_template_matching.PolyhedralTemplateMatching` and class:`~mdapy.identify_fcc_planar_faults.IdentifyFccPlanarFaults`
for implementation details.
"""
verlet_list = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= 18:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, 18
)
verlet_list = self.verlet_list
box, data = self._get_compute_view()
ptm = PolyhedralTemplateMatching(
structure, data, box, rmsd_threshold, verlet_list
)
ptm.compute()
output = ptm.output[: self.N]
data = self.__data.with_columns(pl.lit(output[:, 0], pl.Int32).alias("ptm"))
if return_ordering:
data = data.with_columns(pl.lit(output[:, 1]).alias("ordering"))
if return_rmsd:
data = data.with_columns(pl.lit(output[:, 2]).alias("rmsd"))
if return_atomic_distance:
data = data.with_columns(pl.lit(output[:, 3]).alias("interatomic_distance"))
if return_orientation:
data = data.with_columns(
qx=output[:, 5],
qy=output[:, 6],
qz=output[:, 7],
qw=output[:, 4],
)
if identify_fcc_planar_faults:
structure_types = np.array(ptm.output[:, 0], np.int32)
ptm_indices = np.ascontiguousarray(ptm.ptm_indices[:, 1:13])
ifpt = IdentifyFccPlanarFaults(structure_types, ptm_indices, identify_esf)
ifpt.compute()
data = data.with_columns(pl.lit(ifpt.fault_types[: self.N]).alias("pft"))
self.update_data(data)
[docs]
def cal_centro_symmetry_parameter(self, N: int):
"""
Calculate centro-symmetry parameter for defect identification. The results will be added as self.data['csp'].
Parameters
----------
N : int
Number of nearest neighbors to consider, should be a positive, even integer. 12 for FCC and 8 for BCC.
Notes
-----
See :class:`~mdapy.centro_symmetry_parameter.CentroSymmetryParameter`
for implementation details.
"""
assert N % 2 == 0 and N > 0, f"N must be a positive even number: {N}."
if self.N <= N and sum(self.box.boundary) == 0:
res = np.full(self.N, 10000, float)
else:
has_verlet = False
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= N and hasattr(self, "rc"):
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, N
)
has_verlet = True
if not has_verlet:
self.build_nearest_neighbor(N)
box, data = self._get_compute_view()
csp = CentroSymmetryParameter(data, box, N, self.verlet_list)
csp.compute()
res = csp.csp[: self.N]
self.update_data(self.data.with_columns(csp=res))
[docs]
def cal_common_neighbor_analysis(
self, rc: Optional[float] = None, max_neigh: Optional[int] = None
):
"""
Perform common neighbor analysis for structure identification. This will generate 'cna' column in self.data:
- 0 = Other (unknown coordination)
- 1 = FCC (face-centered cubic)
- 2 = HCP (hexagonal close-packed)
- 3 = BCC (body-centered cubic)
- 4 = ICO (icosahedral coordination)
Parameters
----------
rc : float, optional
Cutoff radius. If not given, it will use adaptive mode to do CNA.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.common_neighbor_analysis.CommonNeighborAnalysis`
for implementation details.
"""
verlet_list = None
neighbor_number = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "rc"):
if rc is None:
if self.neighbor_number.min() >= 14:
tool.sort_neighbor(
self.verlet_list,
self.distance_list,
self.neighbor_number,
14,
)
verlet_list = self.verlet_list
else:
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
verlet_list = self.verlet_list
neighbor_number = self.neighbor_number
else:
if rc is not None:
self.build_neighbor(rc, max_neigh)
verlet_list = self.verlet_list
neighbor_number = self.neighbor_number
box, data = self._get_compute_view()
cna = CommonNeighborAnalysis(data, box, verlet_list, neighbor_number, rc)
cna.compute()
self.update_data(self.__data.with_columns(cna=cna.pattern[: self.N]))
[docs]
def cal_structure_factor(
self,
k_min: float,
k_max: float,
nbins: int,
cal_partial: bool = False,
atomic_form_factors: bool = False,
mode: str = "direct",
rc: Optional[float] = None,
nbin_rdf: int = 200,
window: bool = False,
) -> StructureFactor:
"""
Calculate static structure factor S(k).
Parameters
----------
k_min : float
Minimum wave vector magnitude.
k_max : float
Maximum wave vector magnitude.
nbins : int
Number of bins for k-space averaging.
cal_partial : bool, optional
If True, calculate partial structure factors. Default is False.
atomic_form_factors : bool, default=False
If True, use atomic form factors f to weigh the atoms' individual contributions to S(k). Atomic form factors are taken from `TU Graz <https://lampz.tugraz.at/~hadley/ss1/crystaldiffraction/atomicformfactors/formfactors.php>`_.
mode : str, optional
Calculation mode: 'direct', 'debye', or 'rdf'. Default is 'direct'.
rc : float, optional
Cutoff for the RDF intermediate (only used by ``mode='rdf'``).
Defaults to half of the smallest periodic box thickness.
nbin_rdf : int, default=200
Number of radial bins for the RDF (only used by ``mode='rdf'``).
window : bool, default=True
Apply the Lorch window when transforming g(r) → S(k) (only used
by ``mode='rdf'``).
Returns
-------
StructureFactor
Object containing structure factor data.
Notes
-----
See :class:`~mdapy.structure_factor.StructureFactor` for
implementation details and returned attributes.
"""
sfc = StructureFactor(
self.data,
self.box,
k_min,
k_max,
nbins,
cal_partial,
atomic_form_factors,
mode,
rc=rc,
nbin_rdf=nbin_rdf,
window=window,
)
sfc.compute()
return sfc
[docs]
def cal_bond_analysis(
self, rc: float, nbin: int, max_neigh: Optional[int] = None
) -> BondAnalysis:
"""
Analyze bond length and angle distributions.
Parameters
----------
rc : float
Cutoff radius for bond search.
nbin : int
Number of bins for histograms.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
BondAnalysis
Object containing bond distribution data.
Notes
-----
See :class:`~mdapy.bond_analysis.BondAnalysis` for implementation
details and returned attributes.
Examples
--------
>>> ba = system.cal_bond_analysis(rc=5.0, nbin=100)
>>> print(ba.bond_length_distribution)
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
box, data = self._get_compute_view()
ba = BondAnalysis(
data,
box,
rc,
nbin,
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
ba.compute()
return ba
[docs]
def cal_angular_distribution_function(
self,
rc_dict: Dict[str, List[float]],
nbin: int,
max_neigh: Optional[int] = None,
) -> AngularDistributionFunction:
"""
Calculate angular distribution function (ADF).
Parameters
----------
rc_dict : dict of str to list of float
Dictionary mapping triplet patterns to cutoff radii.
Format: {'A-B-C': [rc_AB_min, rc_AB_max, rc_AC_min, rc_AC_max]}
where A, B, C are element symbols and A is the central atom.
Example: {'O-H-H': [0, 2.0, 0, 2.0]} for water molecules.
nbin : int
Number of angular bins.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
AngularDistributionFunction
Object containing ADF data.
Notes
-----
See :class:`~mdapy.angular_distribution_function.AngularDistributionFunction`
for implementation details and returned attributes.
"""
assert "element" in self.data.columns
rc = np.array(list(rc_dict.values())).max()
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
box, data = self._get_compute_view()
adf = AngularDistributionFunction(
data,
box,
rc_dict,
nbin,
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
adf.compute()
return adf
[docs]
def cal_radial_distribution_function(
self,
rc: float,
nbin: int = 100,
max_neigh: Optional[int] = None,
streaming: Optional[bool] = None,
) -> RadialDistributionFunction:
"""
Calculate radial distribution function g(r).
Parameters
----------
rc : float
Maximum distance for RDF calculation.
nbin : int, optional
Number of distance bins. Default is 100.
max_neigh : int, optional
Maximum number of neighbors per atom (only used by the Verlet path).
streaming : bool, optional
Selects the kernel:
* ``None`` (default): auto-pick — streaming when ``rc`` is large
relative to the box (``2*rc`` exceeds the smallest periodic
thickness divided by 3, i.e. the cutoff that makes the Verlet
list bloat up); Verlet path otherwise.
* ``True``: force the streaming kernel (no neighbour list built).
* ``False``: force the Verlet path (build/reuse a neighbour list).
Returns
-------
RadialDistributionFunction
Object containing RDF data and partial RDFs if applicable.
Notes
-----
See :class:`~mdapy.radial_distribution_function.RadialDistributionFunction`
for implementation details and returned attributes.
"""
box, data = self._get_compute_view()
# Auto-pick streaming when rc is so large that a Verlet list would
# store ~all atoms per atom. Threshold: rc above 1/3 of the smallest
# periodic thickness — at that point the cell list degenerates anyway
# so the streaming kernel's memory savings are the deciding factor.
if streaming is None:
try:
thickness = box.get_thickness()
except AttributeError:
thickness = np.linalg.norm(box.box, axis=1)
periodic_thicknesses = [
thickness[i] for i in range(3) if box.boundary[i]
]
min_thick = min(periodic_thicknesses) if periodic_thicknesses else float("inf")
streaming = rc >= min_thick / 3.0
# Pick species labels for the partial-RDF dict keys: prefer chemical
# ``element`` strings (so g_partial keys read as ``("Al", "Cu")``);
# fall back to integer ``type`` ids; otherwise everyone is species 0.
def _species_labels(view):
if "element" in view.columns:
return view["element"].to_numpy()
if "type" in view.columns:
return view["type"].to_numpy()
return np.zeros(view.shape[0], np.int32)
type_list = _species_labels(data)
if streaming:
# Match the legacy Verlet path's replication semantics: when the
# original box is too thin for the requested rc (rc > L/2 along
# any periodic axis), replicate just enough that rc fits the
# min-image convention in the enlarged cell. Streaming on the
# enlarged cell is still cheap because we only carry positions,
# not a Verlet list.
repeat = self.box.check_small_box(rc)
if sum(repeat) != 3:
from mdapy.tool_function import replicate
rep_data, rep_box = replicate(data, box, *repeat)
rep_x = rep_data["x"].to_numpy()
rep_y = rep_data["y"].to_numpy()
rep_z = rep_data["z"].to_numpy()
rep_types = _species_labels(rep_data)
rdf = RadialDistributionFunction(
rc,
nbin,
rep_box,
type_list=rep_types,
streaming=True,
x=rep_x,
y=rep_y,
z=rep_z,
)
else:
rdf = RadialDistributionFunction(
rc,
nbin,
box,
type_list=type_list,
streaming=True,
x=data["x"].to_numpy(),
y=data["y"].to_numpy(),
z=data["z"].to_numpy(),
)
else:
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
# Refresh box/data: build_neighbor may have replicated the system
# into _enlarge_box/_enlarge_data; the verlet list now indexes
# into the enlarged frame, so type_list must come from there too.
box, data = self._get_compute_view()
type_list = _species_labels(data)
rdf = RadialDistributionFunction(
rc,
nbin,
box,
verlet_list=self.verlet_list,
distance_list=self.distance_list,
neighbor_number=self.neighbor_number,
type_list=type_list,
)
rdf.compute()
return rdf
[docs]
def average_by_neighbor(
self,
average_rc: float,
property_name: str,
include_self: bool = True,
output_name: Optional[str] = None,
max_neigh: Optional[int] = None,
) -> None:
"""
Average a property over local neighborhoods.
Parameters
----------
average_rc : float
Cutoff radius for local averaging.
property_name : str
Name of the column in data to average.
include_self : bool
Whether to include the central atom in averaging. Default is True.
output_name : str, optional
Name for the output column. If None, uses '{property_name}_ave'.
max_neigh : int, optional
Maximum number of neighbors per atom.
Raises
------
AssertionError
If property_name is not a column in data.
"""
assert property_name in self.__data.columns, f"{property_name} not in data."
if hasattr(self, "rc"):
if self.rc < average_rc:
self.build_neighbor(average_rc, max_neigh)
else:
self.build_neighbor(average_rc, max_neigh)
assert not hasattr(self, "_enlarge_data"), (
"average_by_neighbor only supports systems whose box is large enough "
"that no replica was built (i.e. self._enlarge_data must not exist)."
)
data = self.data
data = tool.average_by_neighbor(
average_rc,
data,
property_name,
self.verlet_list,
self.distance_list,
self.neighbor_number,
include_self,
output_name,
)[: self.N].rechunk()
self.update_data(data)
[docs]
def cal_cluster_analysis(
self,
rc: Union[float, int, Dict[str, float]] = 5.0,
max_neigh: Optional[int] = None,
) -> None:
"""
Perform cluster analysis to identify connected atomic groups. The results will be added as self.data['cluster_id'].
Parameters
----------
rc : float, int, or dict, optional
Cutoff radius for cluster connectivity. Can be:
* **float/int**: Single cutoff for all atom pairs
* **dict**: Type-specific cutoffs, e.g., {'1-1': 1.5, '1-2': 1.3}
Default is 5.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.cluster_analysis.ClusterAnalysis` for
implementation details.
Examples
--------
Single cutoff:
>>> system.cal_cluster_analysis(rc=3.0)
Type-specific cutoffs:
>>> rc_dict = {"1-1": 2.8, "1-2": 3.0, "2-2": 3.2}
>>> system.cal_cluster_analysis(rc=rc_dict)
>>> # cluster_id column added to system.data
"""
if isinstance(rc, (int, float, np.integer, np.floating)):
max_rc = float(rc)
elif isinstance(rc, dict):
max_rc = max([i for i in rc.values()])
else:
raise TypeError(
"rc should be a positive number, or a dict like {'1-1':1.5, '1-2':1.3}"
)
if hasattr(self, "rc"):
if self.rc < max_rc:
self.build_neighbor(max_rc, max_neigh)
else:
self.build_neighbor(max_rc, max_neigh)
type_list = None
if isinstance(rc, dict):
assert "type" in self.data.columns, (
"Must have type for multi rc cluster calculation."
)
type_list = self.data["type"].to_numpy(allow_copy=False)
ca = ClusterAnalysis(
rc, self.verlet_list, self.distance_list, self.neighbor_number, type_list
)
ca.compute()
self.update_data(
self.data.with_columns(cluster_id=ca.particleClusters[: self.N])
)
[docs]
def cal_structure_entropy(
self,
rc: float,
sigma: float,
use_local_density: bool = False,
average_rc: float = 0.0,
max_neigh: Optional[int] = None,
) -> None:
"""
Calculate structural entropy for disorder quantification.
Parameters
----------
rc : float
Cutoff radius for neighbor search.
sigma : float
Width parameter for Gaussian kernel.
use_local_density : bool, optional
If True, use local density normalization. Default is False.
average_rc : float, optional
Radius for local averaging of entropy. If 0, no averaging.
Default is 0.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.structure_entropy.StructureEntropy` for
implementation details.
Examples
--------
>>> system.cal_structure_entropy(rc=5.0, sigma=0.5)
>>> # entropy column added to system.data
With local averaging:
>>> system.cal_structure_entropy(rc=5.0, sigma=0.5, average_rc=3.0)
>>> # entropy and entropy_ave columns added to system.data
"""
if hasattr(self, "rc"):
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
else:
self.build_neighbor(rc, max_neigh)
box, _ = self._get_compute_view()
SE = StructureEntropy(
box,
self.verlet_list,
self.distance_list,
self.neighbor_number,
rc,
sigma,
use_local_density,
average_rc,
)
SE.compute()
data = self.data.with_columns(entropy=SE.entropy[: self.N])
if average_rc > 0:
data = data.with_columns(entropy_ave=SE.entropy_ave[: self.N])
self.update_data(data)
[docs]
def cal_voronoi_volume(self) -> None:
"""
Calculate Voronoi cell volumes and related properties.
This method computes the Voronoi tessellation and adds three columns
to the data DataFrame:
* **volume**: Atomic volume from Voronoi cell
* **neighbor_number**: Coordination number from Voronoi neighbors
* **cavity_radius**: This is the distance from the atom to the farthest vertex of its Voronoi cell, representing the radius of the largest empty sphere (containing no other atoms) that touches the atom.
Notes
-----
See :class:`~mdapy.voronoi.Voronoi` for implementation details.
Examples
--------
>>> system.cal_voronoi_volume()
>>> # volume, neighbor_number, cavity_radius columns added to data
>>> avg_volume = system.data["volume"].mean()
"""
vor = Voronoi(self.box, self.data)
volume, neighbor_number, cavity_radius = vor.get_volume()
self.update_data(
self.data.with_columns(
volume=volume,
neighbor_number=neighbor_number,
cavity_radius=cavity_radius,
)
)
[docs]
def cal_chemical_species(
self,
search_species: Optional[List[str]] = None,
element_list: Optional[List[str]] = None,
check_most: int = 10,
add_mol_id: bool = False,
scale: float = 0.6,
) -> Dict[str, int]:
"""
Identify chemical species based on atomic connectivity.
Two atoms *i* and *j* are considered bonded if their interatomic distance
satisfies:
rij <= (vdW_radius_i + vdW_radius_j) * scale
where vdW_radius is the van der Waals radius of the corresponding element,
and ``scale`` is an empirical scaling factor. This approach is similar to
the connectivity detection method used in OpenBabel.
Parameters
----------
search_species : list of str, optional
Molecular formulas to search for, e.g. ``['H2O', 'CO2', 'Cl2', 'N2']``.
If provided, only these species will be counted. Otherwise, the most
frequent species will be returned.
element_list : list of str, optional
List of element symbols corresponding to atom types, e.g.
``['C', 'H', 'O']``. This is required if the ``'element'`` column is
not present in ``self.data``.
check_most : int, default=10
Number of most frequent species to return when ``search_species`` is None.
add_mol_id : bool, default=False
If True and ``search_species`` is provided, add a new column ``'mol_id'``
to the internal data. The molecule IDs are assigned according to the order
of ``search_species`` (zero-based index). Atoms not belonging to any
searched species are assigned ``-1``.
scale : float, default=0.6
Scaling factor applied to the sum of van der Waals radii when determining
bonding cutoff distances.
Returns
-------
dict of str to int
Dictionary mapping species formulas to their corresponding counts.
Examples
--------
>>> species = system.cal_chemical_species(["H2O", "CO2"])
>>> print(species)
{'H2O': 125, 'CO2': 42}
"""
import re
if "element" in self.data.columns:
element_list = self.data["element"].unique().sort()
ele2type = {j: i + 1 for i, j in enumerate(element_list)}
self.update_data(
self.data.with_columns(
pl.col("element").replace_strict(ele2type).rechunk().alias("type")
)
)
else:
assert element_list is not None, (
"'element_list' must be provided because no 'element' column "
"is present in the data."
)
assert "type" in self.data.columns, (
"'type' column is required when 'element' column is not available."
)
Ntype = self.data["type"].max()
assert len(element_list) >= Ntype, (
f"'element_list' must contain at least {Ntype} elements to match "
f"the maximum atom type index ({Ntype})."
)
partial_cutoff = {}
for type1 in range(len(element_list)):
for type2 in range(type1, len(element_list)):
partial_cutoff[f"{type1 + 1}-{type2 + 1}"] = (
vdw_radii[atomic_numbers[element_list[type1]]]
+ vdw_radii[atomic_numbers[element_list[type2]]]
) * scale
self.cal_cluster_analysis(partial_cutoff, max_neigh=None)
if search_species is not None:
pattern = r"([A-Z][a-z]?)(\d*)"
trans_search_species = []
for old_name in search_species:
matches = re.findall(pattern, old_name)
matches.sort()
name = ""
for i, j in matches:
if len(j):
name += i * int(j)
else:
name += i
trans_search_species.append(name)
first_step = (
self.__data.with_columns(
pl.lit(
np.array(element_list)[(self.__data["type"] - 1).to_numpy()]
).alias("type_name")
)
.group_by("cluster_id")
.agg(pl.col("type_name"))
.with_columns(pl.col("type_name").list.sort())
.with_columns(pl.col("type_name").list.join(""))
)
res = (first_step["type_name"].value_counts()).filter(
pl.col("type_name").is_in(trans_search_species)
)
res = dict(zip(res[:, 0], res[:, 1]))
species = {}
for i, j in zip(search_species, trans_search_species):
if j not in res.keys():
species[i] = 0
else:
species[i] = res[j]
if add_mol_id:
clus_mol = first_step.with_columns(
pl.col("type_name")
.replace_strict(
{j: i for i, j in enumerate(trans_search_species)}, default=-1
)
.rechunk()
.alias("mol_id")
)
self.update_data(
self.data.with_columns(
pl.col("cluster_id")
.replace_strict(
dict(zip(clus_mol["cluster_id"], clus_mol["mol_id"]))
)
.rechunk()
.alias("mol_id")
)
)
else:
res = (
(
self.__data.with_columns(
pl.lit(
np.array(element_list)[(self.__data["type"] - 1).to_numpy()]
).alias("type_name")
)
.group_by("cluster_id")
.agg(pl.col("type_name"))
.with_columns(pl.col("type_name").list.sort())
.with_columns(pl.col("type_name").list.join(""))["type_name"]
.value_counts()
)
.sort("count", descending=True)
.limit(check_most)
)
species = dict(zip(res[:, 0], res[:, 1]))
return species
if __name__ == "__main__":
system = System("tests/input_files/water.xyz")
# res = system.cal_chemical_species(search_species=['H2O'], add_mol_id=True)
res = system.cal_chemical_species(scale=0.4)
print(res)
# system.write_xyz('mol.xyz')