Source code for mdapy.system

# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.

"""
Core System Class for MDAPY
===========================

This module provides the central System class that serves as the main interface
for atomic simulation data and analysis in mdapy. The System class integrates
all analysis modules and provides a unified API for structural analysis,
property calculations, and data manipulation.

Classes
-------
System : Core class representing an atomic system
"""

from mdapy.box import Box
from mdapy.load_save import BuildSystem, SaveSystem
from mdapy.knn import NearestNeighbor
from mdapy.neighbor import Neighbor
from mdapy.voronoi import Voronoi
from mdapy.structure_entropy import StructureEntropy
from mdapy.cluster_analysis import ClusterAnalysis
from mdapy.calculator import CalculatorMP
from mdapy.identify_diamond_structure import IdentifyDiamondStructure
from mdapy.polyhedral_template_matching import PolyhedralTemplateMatching
from mdapy.common_neighbor_analysis import CommonNeighborAnalysis
from mdapy.steinhardt_bond_orientation import SteinhardtBondOrientation
from mdapy.radial_distribution_function import RadialDistributionFunction
from mdapy.structure_factor import StructureFactor
from mdapy.centro_symmetry_parameter import CentroSymmetryParameter
from mdapy.warren_cowley_parameter import WarrenCowleyParameter
from mdapy.identify_fcc_planar_faults import IdentifyFccPlanarFaults
from mdapy.ackland_jones_analysis import AcklandJonesAnalysis
from mdapy.common_neighbor_parameter import CommonNeighborParameter
from mdapy.atomic_temperature import AtomicTemperature
from mdapy.bond_analysis import BondAnalysis
from mdapy.chill_plus import ChillPlus
from mdapy.build_bond import build_bond as _build_bond_pairs
from mdapy.angular_distribution_function import AngularDistributionFunction
import mdapy.tool_function as tool
from mdapy.data import atomic_numbers, vdw_radii
from typing import Optional, Dict, TYPE_CHECKING, Union, Iterable, Any, List, Tuple
import numpy as np
import polars as pl


if TYPE_CHECKING:
    from ase import Atoms
    from ovito.data import DataCollection


[docs] class System: """ Core class representing an atomic system with integrated analysis capabilities. The System class is the central data structure in mdapy, providing a unified interface for loading, manipulating, and analyzing atomic configurations. It integrates various structural analysis methods, property calculators, and visualization tools. Parameters ---------- filename : str, optional Path to an atomic configuration file. Supports various formats including POSCAR, LAMMPS data/dump, XYZ, mp etc. Format is auto-detected or can be specified via the `format` parameter. data : pl.DataFrame, optional Polars DataFrame containing atomic information. Must include columns 'x', 'y', 'z' for positions. Other common columns include 'type', 'element', 'vx', 'vy', 'vz', 'fx', 'fy', 'fz', etc. pos : np.ndarray, optional Array of shape (N, 3) containing atomic positions. Used in conjunction with `box` parameter. box : int, float, Iterable[float], np.ndarray, or Box, optional Simulation box specification. Required when using `data` or `pos` parameters. See :class:`~mdapy.box.Box` for supported formats. ase_atom : ase.Atoms, optional ASE Atoms object to initialize the system from. ovito_atom : ovito.data.DataCollection, optional OVITO DataCollection object to initialize the system from. format : str, optional File format specification when loading from `filename`. If None, format is auto-detected. global_info : dict, optional Dictionary containing global system properties such as total energy, virial, stress tensor, timestep, etc. Defaults to empty dict. Attributes ---------- verlet_list : np.ndarray, optional Neighbor list array when neighbors are built. distance_list : np.ndarray, optional Distance list array when neighbors are built. neighbor_number : np.ndarray, optional Number of neighbors for each atom when neighbors are built. rc : float, optional Cutoff radius used for neighbor list construction. Notes ----- The System class uses lazy evaluation for many analyses - neighbor lists and other expensive computations are only performed when needed. Results are cached and reused when possible. At initialization, exactly one of the following must be provided: - `filename` - `data` and `box` - `pos` and `box` - `ase_atom` - `ovito_atom` Examples -------- Creating a system from different sources: .. code-block:: python from mdapy.system import System import numpy as np # Load from file (auto-detect format) system = System("config.dump") # Load from file with explicit format system = System("POSCAR", format="poscar") # Create from numpy array pos = np.random.rand(100, 3) * 10 system = System(pos=pos, box=10) # Create from polars DataFrame import polars as pl data = pl.DataFrame( { "x": np.random.rand(100), "y": np.random.rand(100), "z": np.random.rand(100), "type": np.ones(100, dtype=int), } ) system = System(data=data, box=[10, 10, 10]) Performing structural analysis: .. code-block:: python # Identify crystal structures system.cal_polyhedral_template_matching() # Calculate radial distribution function rdf = system.cal_radial_distribution_function(rc=10.0, nbin=200) # Perform cluster analysis system.cal_cluster_analysis(rc=3.0) Accessing and modifying data: .. code-block:: python # Get positions pos = system.data.select("x", "y", "z") # Add a new column new_data = system.data.with_columns(energy=np.random.rand(system.N)) system.update_data(new_data) # Save to file system.write("output.dump") See Also -------- mdapy.box.Box : Simulation box representation mdapy.calculator.CalculatorMP : Abstract calculator interface mdapy.load_save.BuildSystem : System construction utilities mdapy.load_save.SaveSystem : System export utilities """ def __init__( self, filename: Optional[str] = None, data: Optional[pl.DataFrame] = None, pos: Optional[np.ndarray] = None, box: Union[int, float, Iterable[float], np.ndarray, Box] = None, ase_atom: Optional["Atoms"] = None, ovito_atom: Optional["DataCollection"] = None, format: Optional[str] = None, global_info: Optional[Dict[str, Any]] = None, ): """Initialize a System object from various data sources.""" self.__global_info = {} if isinstance(filename, str): self.__data, self.box, self.__global_info = BuildSystem.from_file( filename, format ) elif data is not None and box is not None: self.__data, self.box = BuildSystem.from_data(data, box) elif pos is not None and box is not None: self.__data, self.box = BuildSystem.from_array(pos, box) elif ase_atom is not None: self.__data, self.box = BuildSystem.from_ase(ase_atom) elif ovito_atom is not None: self.__data, self.box, self.__global_info = BuildSystem.from_ovito( ovito_atom ) else: raise RuntimeError( "One must at least provide filename or [data, box] or [pos, box] or ase_atom or ovito_atom." ) # Make sure the initial DataFrame is single-chunked. The various # BuildSystem readers are inconsistent on this point and a multi- # chunk frame would later break `Series.to_numpy(allow_copy=False)`. self.__data = self.__data.rechunk() if not len(self.__global_info) and global_info is not None: self.__global_info = dict(global_info) self.__calc: Optional[CalculatorMP] = None @property def box(self) -> Box: """The simulation box. Assigning to this attribute (``system.box = ...``) is equivalent to calling :meth:`update_box` *without* ``scale_pos`` — i.e. atoms keep their Cartesian coordinates while the cell vectors change. Any cached neighbor / bond / Voronoi data is invalidated, since indices and distances depend on the box. """ return self.__box @box.setter def box(self, value: Union[int, float, Iterable[float], np.ndarray, Box]): new_box = value if isinstance(value, Box) else Box(value) self.__box = new_box # Invalidate caches that depend on the box. During __init__ none of # these attrs exist yet, so the hasattr guards make this a no-op. if isinstance(getattr(self, "_System__calc", None), CalculatorMP): self.__calc.results = {} for attr in ("verlet_list", "neighbor_number", "distance_list", "rc", "bond", "voro_verlet_list", "voro_distance_list", "voro_face_area", "voro_neighbor_number", "_enlarge_box", "_enlarge_data"): if hasattr(self, attr): delattr(self, attr) @property def calc(self) -> Optional[CalculatorMP]: return self.__calc @calc.setter def calc(self, value): if not isinstance(value, CalculatorMP): raise TypeError( f"calc must be CalculatorMP, instead of {type(value).__name__}" ) value.results = {} self.__calc = value @property def global_info(self) -> Dict[str, Any]: """ Obtain global system information. Returns ------- dict Dictionary containing global properties such as total energy, virial tensor, stress tensor, timestep, temperature, etc. Keys and values depend on the data source and attached calculator. """ return self.__global_info @property def data(self) -> pl.DataFrame: """ Access the atomic data DataFrame. Returns ------- pl.DataFrame Polars DataFrame containing per-atom properties. Always includes columns 'x', 'y', 'z' for Cartesian coordinates. May include additional columns such as 'type', 'element', 'vx', 'vy', 'vz' (velocities), 'fx', 'fy', 'fz' (forces), and various computed properties from analysis methods. Examples -------- >>> system = System("config.dump") >>> print(system.data.columns) ['x', 'y', 'z', 'type', 'vx', 'vy', 'vz'] """ return self.__data @property def N(self) -> int: """ Get the number of atoms in the system. Returns ------- int Total number of atoms/particles in the system. Examples -------- >>> system = System(pos=np.random.rand(100, 3), box=10) >>> print(system.N) 100 """ return self.__data.shape[0] def __repr__(self) -> str: """ Return string representation of the System. Returns ------- str Multi-line string containing atom number, box information, global properties, and a summary of the data DataFrame. """ if not self.__global_info: return f"Atom Number: {self.N}\n{self.box}\nParticle Information:\n{self.__data}" else: info = f"Atom Number: {self.N}\n{self.box}\n" for key in self.__global_info.keys(): info += f"{key}: {self.__global_info[key]}\n" info += f"Particle Information:\n{self.__data}" return info
[docs] def set_element(self, element: Union[str, List[str], Tuple[str], np.ndarray]): """ Set element names for atoms in the system. Parameters ---------- element : str, list of str, tuple of str, or np.ndarray Element specification: * **str**: Assigns the same element to all atoms * **list/tuple/array**: Must have length equal to N, specifying element for each atom individually Examples -------- Assign same element to all atoms: >>> system.set_element("Cu") Assign different elements: >>> elements = ["Cu"] * 50 + ["Al"] * 50 >>> system.set_element(elements) Notes ----- This method updates the 'element' column in the data DataFrame. If an 'element' column exists, it is replaced. """ if isinstance(element, str): df = self.__data.with_columns(pl.lit(element).alias("element")) else: assert len(element) == self.N, ( f"Length of element ({len(element)}) must equal the atom " f"number ({self.N})." ) if "element" in self.data.columns: df = self.__data.select(pl.all().exclude("element")).with_columns( pl.lit(np.array(element)).alias("element") ) else: df = self.__data.with_columns( pl.lit(np.array(element)).alias("element") ) self.update_data(df, True)
[docs] def set_type_by_element( self, element_list: Union[List[str], Tuple[str], np.ndarray] ): """ Set atom types based on element ordering. Parameters ---------- element_list : list of str, tuple of str, or np.ndarray Ordered list of unique elements. Atoms are assigned type numbers based on the index of their element in this list (starting from 1). Raises ------ AssertionError If data does not contain 'element' column or if any element in data is not present in the provided element list. Examples -------- >>> system.set_element(["Cu", "Cu", "Al", "Al"]) >>> system.set_type_by_element(["Cu", "Al"]) >>> print(system.data.select("element", "type")) ┌─────────┬──────┐ │ element │ type │ ├─────────┼──────┤ │ Cu │ 1 │ │ Cu │ 1 │ │ Al │ 2 │ │ Al │ 2 │ └─────────┴──────┘ Notes ----- This method creates or updates the 'type' column in the data DataFrame. Type numbers start from 1 and correspond to the order in the element list. """ assert "element" in self.data.columns, "Data must contain element column." for i in self.data["element"].unique(): assert i in element_list, ( f"element_list must include element {i!r} " "(seen in data['element'])." ) ele2type = {j: i for i, j in enumerate(element_list, start=1)} self.update_data( self.data.with_columns( pl.col("element") .replace_strict(ele2type, return_dtype=pl.Int32) .rechunk() .alias("type") ), True, )
[docs] def get_positions(self, reduced: bool = False) -> pl.DataFrame: """ Extract atomic positions from the system. Parameters ---------- reduced : bool, optional If True, return fractional (reduced) coordinates in the box basis. If False (default), return Cartesian coordinates. Returns ------- pl.DataFrame DataFrame containing position columns: * **reduced=False**: Columns 'x', 'y', 'z' (Cartesian coordinates) * **reduced=True**: Columns 'r_x', 'r_y', 'r_z' (fractional coordinates) Examples -------- >>> system = System("POSCAR") >>> cart_pos = system.get_positions(reduced=False) >>> frac_pos = system.get_positions(reduced=True) Notes ----- Fractional coordinates are computed using the inverse box matrix: reduced = positions @ box.inverse_box """ if reduced: inv = self.box.inverse_box # pos @ inv return self.data.select( r_x=pl.col("x") * inv[0, 0] + pl.col("y") * inv[1, 0] + pl.col("z") * inv[2, 0], r_y=pl.col("x") * inv[0, 1] + pl.col("y") * inv[1, 1] + pl.col("z") * inv[2, 1], r_z=pl.col("x") * inv[0, 2] + pl.col("y") * inv[1, 2] + pl.col("z") * inv[2, 2], ) else: return self.data.select("x", "y", "z")
[docs] def get_velocities(self) -> pl.DataFrame: """ Extract atomic velocities from the system. Returns ------- pl.DataFrame DataFrame with columns 'vx', 'vy', 'vz' containing velocity components for each atom. Raises ------ AssertionError If the data does not contain velocity columns ('vx', 'vy', 'vz'). Examples -------- >>> system = System("relax.dump") # File with velocities >>> velocities = system.get_velocities() """ for i in ["vx", "vy", "vz"]: assert i in self.data.columns return self.data.select("vx", "vy", "vz")
[docs] def set_pka( self, energy: float, direction: np.ndarray, index: Optional[int] = None, element: Optional[str] = None, factor: float = 1.0, ): """ Set primary knock-on atom (PKA) for radiation damage simulation. This method assigns initial kinetic energy and direction to a PKA atom, commonly used to initialize cascade simulations in radiation damage studies. We assume that the velocity in the units of A/fs, you need to use `factor` to convert your velocity to this units. Parameters ---------- energy : float Kinetic energy to assign to the PKA (in eV). direction : np.ndarray 1D array of length 3 specifying the velocity direction vector. Will be normalized automatically. index : int, optional Index of the atom to set as PKA. If None, selects by nearest center atom. element : str, optional Element name to select PKA. If provided and index is None, selects by nearest center atom with same element. factor : float Convert your velocity to the units of A/fs. Defaults to 1.0. Notes ----- This method updates the velocity columns ('vx', 'vy', 'vz') in the data DataFrame. Examples -------- Set PKA by atom index: >>> system.set_pka(energy=1000.0, direction=np.array([1, 3, 5]), index=50) Set PKA by element: >>> system.set_pka(energy=2000.0, direction=np.array([1, 3, 5]), element="Fe") """ self.update_data( tool._set_pka( self.data.with_columns( pl.col("vx") * factor, pl.col("vy") * factor, pl.col("vz") * factor ), self.box, energy, direction, index, element, ).with_columns( pl.col("vx") / factor, pl.col("vy") / factor, pl.col("vz") / factor ) )
[docs] def get_energy(self) -> float: """ Calculate total potential energy of the system. Returns ------- float Total potential energy in the units eV. Raises ------ AssertionError If no calculator is attached to the system. Examples -------- >>> from mdapy.eam import EAM >>> system = System("config.dump") >>> system.calc = EAM("potential.eam") >>> total_energy = system.get_energy() """ assert isinstance(self.calc, CalculatorMP), "Must assign a calc first." return self.calc.get_energy(self.data, self.box)
[docs] def get_energies(self) -> np.ndarray: """ Calculate per-atom potential energies in the units eV. Returns ------- np.ndarray 1D array of shape (N,) containing potential energy for each atom. Raises ------ AssertionError If no calculator is attached to the system. """ assert isinstance(self.calc, CalculatorMP), "Must assign a calc first." return self.calc.get_energies(self.data, self.box)
[docs] def get_force(self) -> np.ndarray: """ Calculate forces acting on each atom in the units eV/A. Returns ------- np.ndarray 2D array of shape (N, 3) containing force components [fx, fy, fz] for each atom. Raises ------ AssertionError If no calculator is attached to the system. Examples -------- >>> from mdapy.eam import EAM >>> system = System("config.dump") >>> system.calc = EAM("potential.eam") >>> forces = system.get_force() >>> print(forces.shape) (1000, 3) """ assert isinstance(self.calc, CalculatorMP), "Must assign a calc first." return self.calc.get_forces(self.data, self.box)
[docs] def get_stress(self) -> np.ndarray: """ Calculate stress tensor of the system in the units eV/A^3. Returns ------- np.ndarray 1D array of shape (6,) containing stress tensor components in Voigt notation: [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy]. Raises ------ AssertionError If no calculator is attached to the system. Examples -------- >>> from mdapy.eam import EAM >>> system = System("config.dump") >>> system.calc = EAM("potential.eam") >>> stress = system.get_stress() >>> print(stress) # [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy] """ assert isinstance(self.calc, CalculatorMP), "Must assign a calc first." return self.calc.get_stress(self.data, self.box)
[docs] def get_virials(self) -> np.ndarray: """ Calculate per-atom virial tensors in the units eV. Returns ------- np.ndarray 2D array of shape (N, 9) containing virial tensor components for each atom, ordered as: [v_xx, v_xy, v_xz, v_yx, v_yy, v_yz, v_zx, v_zy, v_zz]. Raises ------ AssertionError If no calculator is attached to the system. Examples -------- >>> from mdapy.eam import EAM >>> system = System("config.dump") >>> system.calc = EAM("potential.eam") >>> virials = system.get_virials() >>> print(virials.shape) (1000, 9) """ assert isinstance(self.calc, CalculatorMP), "Must assign a calc first." return self.calc.get_virials(self.data, self.box)
[docs] def update_data( self, data: pl.DataFrame, reset_calculator: bool = False, reset_neighbor: bool = False, reset_calcolator: Optional[bool] = None, ) -> None: """ Update the atomic data DataFrame. Parameters ---------- data : pl.DataFrame New DataFrame containing updated atomic information. Should maintain the same structure (columns) as the original data. reset_calculator : bool, optional If True, clears cached results from the attached calculator. Set to True when atomic positions or types/element change. Default is False. reset_neighbor : bool, optional If True, clear all neighbor information. Set to True when atomic positions change. Default is False. reset_calcolator : bool, optional *Deprecated.* Misspelled alias for ``reset_calculator`` kept for backwards compatibility; emits a ``DeprecationWarning`` and will be removed in a future release. Examples -------- >>> # Add a new property column >>> new_data = system.data.with_columns( ... temperature=np.random.rand(system.N) * 300 ... ) >>> system.update_data(new_data) >>> # Update positions (reset calculator) >>> new_data = system.data.with_columns(x=system.data["x"] + 0.1) >>> system.update_data(new_data, reset_calculator=True) Notes ----- When ``reset_calculator=True``, any cached energy, force, or stress calculations are invalidated and will be recomputed on next access. The DataFrame is rechunked into a single contiguous chunk on write. Polars operations such as ``filter()`` or ``concat()`` can leave a DataFrame with multiple internal chunks; downstream code that calls ``Series.to_numpy(allow_copy=False)`` then raises. Rechunking once at the assignment boundary makes every consumer safe. """ if reset_calcolator is not None: import warnings warnings.warn( "`reset_calcolator` is a misspelling and is deprecated; " "use `reset_calculator` instead.", DeprecationWarning, stacklevel=2, ) reset_calculator = reset_calculator or reset_calcolator self.__data = data.rechunk() if reset_calculator and isinstance(self.calc, CalculatorMP): self.calc.results = {} if reset_neighbor: if hasattr(self, "verlet_list"): del self.verlet_list, self.neighbor_number, self.distance_list if hasattr(self, "rc"): del self.rc if hasattr(self, "bond"): del self.bond if hasattr(self, "voro_verlet_list"): del ( self.voro_verlet_list, self.voro_distance_list, self.voro_face_area, self.voro_neighbor_number, ) if hasattr(self, "_enlarge_box"): del self._enlarge_box, self._enlarge_data
def _get_compute_view(self) -> Tuple[Box, pl.DataFrame]: """Return ``(box, data)`` to feed to per-atom analysis kernels. When :meth:`build_neighbor` (or :meth:`build_voronoi_neighbor` / :meth:`build_nearest_neighbor`) detects a system whose box is smaller than required, it transparently builds a replicated copy and stores it as ``self._enlarge_box`` / ``self._enlarge_data``. Analysis methods must then operate on the *enlarged* view (so neighbour indices align), and the caller is responsible for slicing the result back to ``self.N``. Returns ------- Tuple[Box, pl.DataFrame] The enlarged view if one exists, otherwise the system's own box and DataFrame. """ if hasattr(self, "_enlarge_data"): return self._enlarge_box, self._enlarge_data return self.box, self.data
[docs] def update_box( self, box: Union[int, float, Iterable[float], np.ndarray, Box], scale_pos: bool = False, ) -> None: """ Update the simulation box. Parameters ---------- box : int, float, Iterable[float], np.ndarray, or Box New box specification. See :class:`~mdapy.box.Box` for accepted formats. scale_pos : bool, optional If True, scales atomic positions affinely with the box change. Useful for applying strain or changing lattice constants while maintaining fractional coordinates. Default is False. Raises ------ AssertionError If `scale_pos=True` but not all boundaries are periodic. Examples -------- Change box without scaling positions: >>> system.update_box([20, 20, 20]) Apply uniform strain by scaling box and positions: >>> system.update_box([11, 11, 11], scale_pos=True) Notes ----- When `scale_pos=True`, the transformation is: new_pos = old_pos @ (old_box^-1 @ new_box) This preserves fractional coordinates in the new box. """ new_box = Box(box) if scale_pos: assert sum(new_box.boundary) == 3, ( "only support all periodic boundary condition." ) map_matrix = np.linalg.solve(self.box.box, new_box.box) self.__data = self.__data.with_columns( pl.col("x") - self.box.origin[0], pl.col("y") - self.box.origin[1], pl.col("z") - self.box.origin[2], ).with_columns( x=pl.col("x") * map_matrix[0, 0] + pl.col("y") * map_matrix[1, 0] + pl.col("z") * map_matrix[2, 0] + new_box.origin[0], y=pl.col("x") * map_matrix[0, 1] + pl.col("y") * map_matrix[1, 1] + pl.col("z") * map_matrix[2, 1] + new_box.origin[1], z=pl.col("x") * map_matrix[0, 2] + pl.col("y") * map_matrix[1, 2] + pl.col("z") * map_matrix[2, 2] + new_box.origin[2], ) # The `box` setter handles all neighbor / calculator cache # invalidation (see the property definition at the top of the class). self.box = new_box
[docs] def wrap_pos(self) -> None: """Wrap positions into box for PBC boundary.""" self.update_data(tool.wrap_pos(self.data, self.box), True, True)
[docs] def replicate(self, nx: int, ny: int, nz: int) -> None: """ Replicate the system by creating a supercell. Parameters ---------- nx : int Number of replications along x-direction. ny : int Number of replications along y-direction. nz : int Number of replications along z-direction. Examples -------- Create a 2x2x2 supercell: >>> system = System("unit_cell.poscar") >>> system.replicate(2, 2, 2) >>> print(system.N) # 8 times the original number Notes ----- This method modifies the system in place. The box is scaled by [nx, ny, nz] and atoms are replicated accordingly. """ assert nx > 0 assert ny > 0 assert nz > 0 data, box = tool.replicate(self.__data, self.box, int(nx), int(ny), int(nz)) self.update_data(data, True, True) self.update_box(box)
[docs] def to_ovito(self) -> "DataCollection": """ Convert system to OVITO DataCollection object. Returns ------- ovito.data.DataCollection OVITO data structure containing the atomic configuration. Raises ------ ImportError If OVITO is not installed. Examples -------- >>> data = system.to_ovito() >>> # Use OVITO's analysis or visualization tools >>> from ovito.modifiers import CommonNeighborAnalysisModifier >>> data.apply(CommonNeighborAnalysisModifier()) """ return SaveSystem.to_ovito(self.__data, self.box, self.__global_info)
[docs] def to_ase(self) -> "Atoms": """ Convert system to ASE Atoms object. Returns ------- ase.Atoms ASE Atoms object containing the atomic configuration. Raises ------ ImportError If ASE is not installed. AssertionError If data does not contain an 'element' column. Examples -------- >>> atoms = system.to_ase() >>> # Use ASE's functionality >>> from ase.io import write >>> write("output.xyz", atoms) """ return SaveSystem.to_ase(self.__data, self.box)
[docs] def write_mp(self, output_name: str) -> None: """ Save system to mdapy `.mp` (parquet-based) format. Parameters ---------- output_name : str Output file path. The `.mp` extension is recommended. Notes ----- The `.mp` format round-trips `box`, the full per-atom DataFrame, and `global_info` (including non-numeric metadata) without loss. Examples -------- >>> system.write_mp("config.mp") """ SaveSystem.write_mp(output_name, self.data, self.box, self.global_info)
[docs] def write_xyz(self, output_name: str, classical: bool = False, compress=False): """ Save the system to XYZ format. Parameters ---------- output_name : str Output file path. ``.xyz`` (or ``.xyz.gz`` with ``compress=True``) is the conventional extension. classical : bool, optional If True, write the legacy 4-column XYZ ``element x y z`` form (no Properties / Lattice metadata). Defaults to False, i.e. extended XYZ that round-trips box, velocities, forces, etc. compress : bool, optional If True, gzip the output (``.gz`` is appended if missing). Defaults to False. Notes ----- The system's ``global_info`` is forwarded to the XYZ comment line as extra metadata keys. Existing keys (``Lattice``, ``Properties``, ``pbc``) are reserved by the writer and must not appear in ``global_info``. """ SaveSystem.write_xyz( output_name, self.box, self.data, classical, compress, **self.global_info )
[docs] def write_poscar( self, output_name: str, reduced_pos: bool = False, selective_dynamics: bool = False, compress=False, ): """ Save the system to a VASP POSCAR file. Parameters ---------- output_name : str Output file path. reduced_pos : bool, optional If True, write fractional ``Direct`` coordinates instead of Cartesian. Defaults to False. selective_dynamics : bool, optional If True, append a ``Selective dynamics`` block. The data must contain ``sdx``, ``sdy``, ``sdz`` boolean-flag columns. Defaults to False. compress : bool, optional If True, gzip the output. Defaults to False. Notes ----- Atoms are grouped by element in ``self.data``'s row order; if you need a specific element ordering, sort the DataFrame first. """ SaveSystem.write_poscar( output_name, self.box, self.data, reduced_pos=reduced_pos, selective_dynamics=selective_dynamics, compress=compress, )
[docs] def write_data( self, output_name: str, element_list: Optional[List[str]] = None, num_type: Optional[int] = None, data_format: str = "atomic", compress=False, ): """ Save the system to a LAMMPS data file. Parameters ---------- output_name : str Output file path. element_list : list of str, optional Element ordering for the LAMMPS ``Masses`` section. When the system has an ``element`` column, the LAMMPS ``type`` column is derived locally from this ordering (the System's own DataFrame is *not* mutated by this call). Element names not present in the data raise ``AssertionError``. If the data already has a ``type`` column and no ``element`` column, the existing types are used and ``element_list`` is recorded only in the ``Masses`` block. num_type : int, optional Total number of atom types declared in the file. Defaults to ``data['type'].max()``. Must be at least that maximum. data_format : str, optional ``'atomic'`` (id type x y z) or ``'charge'`` (id type q x y z). Defaults to ``'atomic'``. compress : bool, optional If True, gzip the output. Defaults to False. """ # When element_list is provided AND the data has an element column, # derive the LAMMPS `type` column from the element ordering — but on # a *local copy* so the System's own DataFrame is not mutated by a # write call. data = self.data if element_list is not None and "element" in data.columns: for ele in data["element"].unique(): assert ele in element_list, ( f"element_list must include element {ele!r}." ) ele2type = {j: i for i, j in enumerate(element_list, start=1)} data = data.with_columns( pl.col("element") .replace_strict(ele2type, return_dtype=pl.Int32) .rechunk() .alias("type") ) SaveSystem.write_data( output_name, self.box, data, element_list=element_list, num_type=num_type, data_format=data_format, compress=compress, )
[docs] def write_dump(self, output_name: str, timestep: float = 0, compress=False): """ Save the system to a single-frame LAMMPS dump file. Parameters ---------- output_name : str Output file path. ``.dump`` / ``.lammpstrj`` are conventional. timestep : float, optional Value written into the ``ITEM: TIMESTEP`` header. Defaults to 0. compress : bool, optional If True, gzip the output. Defaults to False. Notes ----- For multi-frame trajectories use :class:`mdapy.Trajectory` instead. Numeric columns plus the ``element`` string column are preserved; other non-numeric columns are dropped. """ SaveSystem.write_dump( output_name, self.box, self.data, timestep=timestep, compress=compress )
[docs] def build_neighbor( self, rc: float, max_neigh: Optional[int] = None, ) -> None: """ Build neighbor list for the system. This method constructs a Verlet neighbor list within a cutoff radius, which is required for many analysis methods. For small systems, the box may be automatically replicated to ensure sufficient neighbors. Parameters ---------- rc : float Cutoff radius for neighbor search (in simulation units). max_neigh : int, optional Maximum number of neighbors per atom. If None, automatically determined based on the actual neighbor number Increase this if you get warnings about insufficient neighbor slots. Attributes Set -------------- verlet_list : np.ndarray 2D array of shape (N, max_neigh) containing neighbor indices. Unfilled slots contain -1. distance_list : np.ndarray 2D array of shape (N, max_neigh) containing distances to neighbors. neighbor_number : np.ndarray 1D array of shape (N,) containing number of neighbors for each atom. rc : float The cutoff radius used. _enlarge_box : Box, optional replicated box for small system. _enlarge_data : pl.DataFrame, optional replicated atom dataframe for small system Examples -------- >>> system.build_neighbor(rc=5.0) >>> print(system.verlet_list.shape) (1000, 50) # 1000 atoms, up to 50 neighbors each >>> print(system.neighbor_number) [12 12 12 ... 12 12 12] # FCC coordination See :class:`~mdapy.neighbor.Neighbor` for implementation details. """ neigh = Neighbor(rc, self.box, self.data, max_neigh) neigh.compute() self.rc = rc if hasattr(neigh, "_enlarge_box"): self._enlarge_box: Box = neigh._enlarge_box if hasattr(neigh, "_enlarge_data"): self._enlarge_data: pl.DataFrame = neigh._enlarge_data self.verlet_list, self.distance_list, self.neighbor_number = ( neigh.verlet_list, neigh.distance_list, neigh.neighbor_number, )
[docs] def build_voronoi_neighbor( self, a_face_area_threshold: float = -1.0, r_face_area_threshold: float = -1.0 ) -> None: """ Calculate Voronoi neighbors and face properties for all atoms. This method performs Voronoi tessellation to identify nearest neighbors based on shared Voronoi cell faces. It can filter neighbors based on face area thresholds to exclude insignificant contacts. Parameters ---------- a_face_area_threshold : float, optional Absolute face area threshold. Faces with area below this value are ignored as neighbors. Default is -1.0 (no filtering). r_face_area_threshold : float, optional Relative face area threshold (fraction of average face area). Faces with relative area below this value are ignored. Default is -1.0 (no filtering). Attributes Set -------------- voro_verlet_list : np.ndarray 2D array of shape (N, max_neigh) containing neighbor indices. Unfilled slots contain -1. voro_distance_list : np.ndarray 2D array of shape (N, max_neigh) containing distances to neighbors. voro_neighbor_number : np.ndarray 1D array of shape (N,) containing number of neighbors for each atom. voro_face_area : np.ndarray 2D array of shape (N, max_neighbors) containing the area of the Voronoi face shared with each neighbor. _enlarge_box : Box, optional replicated box for small system. _enlarge_data : pl.DataFrame, optional replicated atom dataframe for small system Examples -------- >>> system.build_voronoi_neighbor() >>> print(system.voro_verlet_list.shape) (1000, 50) # 1000 atoms, up to 50 neighbors each See :class:`~mdapy.voronoi.Voronoi` for implementation details. """ vor = Voronoi(self.box, self.data) ( self.voro_verlet_list, self.voro_distance_list, self.voro_face_area, self.voro_neighbor_number, ) = vor.get_neighbor(a_face_area_threshold, r_face_area_threshold) if hasattr(vor, "_enlarge_box"): self._enlarge_box: Box = vor._enlarge_box if hasattr(vor, "_enlarge_data"): self._enlarge_data: pl.DataFrame = vor._enlarge_data
[docs] def build_nearest_neighbor(self, k: int) -> None: """ Calculate `k` nearest neighbor information. Parameters ---------- k : int Maximum number of neighbors to find per atom. Attributes Set -------------- verlet_list : np.ndarray 2D array of shape (N, k) containing neighbor indices. distance_list : np.ndarray 2D array of shape (N, k) containing distances to neighbors. neighbor_number : np.ndarray 1D array of shape (N,) containing number of neighbors for each atom. _enlarge_box : Box, optional replicated box for small system. _enlarge_data : pl.DataFrame, optional replicated atom dataframe for small system Examples -------- >>> system.build_nearest_neighbor(12) >>> print(system.verlet_list.shape) (1000, 12) # 1000 atoms, up to 12 sorted neighbors each See :class:`~mdapy.knn.NearestNeighbor` for implementation details. """ kdt = NearestNeighbor(self.data, self.box, k) kdt.compute() if hasattr(kdt, "_enlarge_box"): self._enlarge_box: Box = kdt._enlarge_box if hasattr(kdt, "_enlarge_data"): self._enlarge_data: pl.DataFrame = kdt._enlarge_data self.verlet_list, self.distance_list = kdt.indices_py, kdt.distances_py self.neighbor_number = np.full(self.verlet_list.shape[0], k, np.int32)
@staticmethod def _normalize_bond_cutoff( rc: Union[float, Dict[Tuple[Union[int, str], Union[int, str]], float], np.ndarray], data: pl.DataFrame, ) -> Tuple[float, np.ndarray, np.ndarray]: if np.isscalar(rc): rc = float(rc) assert rc > 0, "rc should be larger than 0." return rc, np.zeros(data.shape[0], np.int32), np.array([[rc]], float) if isinstance(rc, dict): assert len(rc) > 0, "pairwise rc should not be empty." first_pair = next(iter(rc)) assert len(first_pair) == 2, "pairwise rc key should have two entries." first_value = first_pair[0] if isinstance(first_value, str): assert "element" in data.columns, ( "Data must contain element column for element-pair bond cutoff." ) labels = data["element"].to_numpy() unique_labels = np.unique(labels) else: assert "type" in data.columns, ( "Data must contain type column for type-pair bond cutoff." ) labels = data["type"].to_numpy(allow_copy=False) unique_labels = np.unique(labels) compact_type = np.searchsorted(unique_labels, labels).astype(np.int32) ntype = unique_labels.shape[0] cutoff_matrix = np.full((ntype, ntype), -1.0, float) label_to_index = {value: idx for idx, value in enumerate(unique_labels.tolist())} for pair, cutoff in rc.items(): assert len(pair) == 2, "pairwise rc key should have two type indices." i, j = pair[0], pair[1] assert i in label_to_index and j in label_to_index, ( f"type/element pair {(i, j)} is not in current system." ) cutoff = float(cutoff) assert cutoff > 0, "pairwise rc should be larger than 0." ii, jj = label_to_index[i], label_to_index[j] cutoff_matrix[ii, jj] = cutoff cutoff_matrix[jj, ii] = cutoff if np.any(cutoff_matrix < 0): missing_pairs = [] for i in range(ntype): for j in range(i, ntype): if cutoff_matrix[i, j] < 0: missing_pairs.append((unique_labels[i], unique_labels[j])) raise ValueError(f"Missing rc for type pairs: {missing_pairs}") else: assert "type" in data.columns, ( "Data must contain type column for matrix bond cutoff." ) type_list = data["type"].to_numpy(allow_copy=False) unique_labels = np.unique(type_list) compact_type = np.searchsorted(unique_labels, type_list).astype(np.int32) ntype = unique_labels.shape[0] cutoff_matrix = np.asarray(rc, float) assert cutoff_matrix.ndim == 2, "pairwise rc should be a 2D array." assert cutoff_matrix.shape == (ntype, ntype), ( f"pairwise rc shape should be {(ntype, ntype)}." ) assert np.all(cutoff_matrix > 0), "pairwise rc should be larger than 0." return float(cutoff_matrix.max()), compact_type, cutoff_matrix
[docs] def create_bonds( self, rc: Union[ float, Dict[Tuple[Union[int, str], Union[int, str]], float], np.ndarray, ], max_neigh: Optional[int] = None, ) -> np.ndarray: """ Build bond pairs from neighbor information. Parameters ---------- rc : float, dict, or np.ndarray Bond cutoff definition. If a float is given, all bonds use the same cutoff. If a dict is given, it should map ``(type_i, type_j)`` or ``(element_i, element_j)`` to the corresponding cutoff. If a 2D array is given, its rows and columns follow the sorted unique atom types in the system. max_neigh : int, optional Maximum number of neighbors per atom when building the neighbor list. Returns ------- np.ndarray A 2D integer array of shape ``(Nbond, 2)`` containing 0-based atom indices for each bond pair. Notes ----- This method stores the result in ``self.bond`` and also returns it. """ if np.isscalar(rc): max_rc = float(rc) else: if isinstance(rc, dict): max_rc = float(np.max(np.asarray(list(rc.values()), float))) else: assert "type" in self.data.columns, ( "Data must contain type column for matrix bond cutoff." ) max_rc = float(np.max(np.asarray(rc, float))) assert max_rc > 0, "rc should be larger than 0." has_neigh = False if hasattr(self, "rc") and self.rc >= max_rc: has_neigh = True if not has_neigh: self.build_neighbor(max_rc, max_neigh) _, data = self._get_compute_view() _, compact_type, cutoff_matrix = self._normalize_bond_cutoff(rc, data) bond = np.asarray( _build_bond_pairs( self.verlet_list, self.distance_list, self.neighbor_number, compact_type, cutoff_matrix, ), dtype=np.int32, ) if bond.size == 0: self.bond = np.empty((0, 2), np.int32) return self.bond if hasattr(self, "_enlarge_data"): bond %= self.N bond.sort(axis=1) bond = bond[bond[:, 0] != bond[:, 1]] if bond.size == 0: self.bond = np.empty((0, 2), np.int32) return self.bond self.bond = np.unique(bond, axis=0) return self.bond
[docs] def delete_overlap(self, rc: float, max_neigh: Optional[int] = None) -> int: """ Delete overlapping atoms that are closer than ``rc`` to another atom. For every pair of atoms within ``rc`` (respecting periodic boundary conditions), the atom with the **larger** index is removed and the one with the smaller index is kept. An atom that has already been removed cannot itself trigger further removals, so each atom is deleted at most once and clusters of mutually overlapping atoms collapse to their single lowest-index member. Parameters ---------- rc : float Distance threshold. Atom pairs separated by less than ``rc`` are considered overlapping. max_neigh : int, optional Maximum number of neighbors per atom when building the neighbor list. If None, the buffer is sized automatically. Returns ------- int The number of atoms removed. Notes ----- This method modifies ``self.data`` in place; neighbor lists and cached calculator results are reset because atom indices change. Examples -------- >>> n_removed = system.delete_overlap(0.1) >>> print(f"removed {n_removed} overlapping atoms") """ rc = float(rc) assert rc > 0, "rc should be larger than 0." N = self.N has_neigh = hasattr(self, "rc") and self.rc >= rc if not has_neigh: self.build_neighbor(rc, max_neigh) verlet = self.verlet_list distance = self.distance_list neigh_num = self.neighbor_number # When the box was too small for `rc`, the neighbor list indexes # into the enlarged (replicated) system; map those indices back # to the original atoms. Cell 0 holds the originals at indices # 0..N-1, so `enlarged_index % N` recovers the original atom. verlet_orig = verlet % N if hasattr(self, "_enlarge_data") else verlet # Sequential sweep in index order: atom j is removed iff it has a # still-surviving neighbor with a smaller index. Because lower # indices are finalized before we reach j, one pass suffices. deleted = np.zeros(N, dtype=bool) for j in range(N): cnt = int(neigh_num[j]) if cnt == 0: continue nbr = verlet_orig[j, :cnt] d = distance[j, :cnt] smaller = nbr[(nbr < j) & (d < rc)] if smaller.size and not deleted[smaller].all(): deleted[j] = True n_removed = int(deleted.sum()) if n_removed == 0: return 0 keep = ~deleted self.update_data( self.data.filter(keep), reset_calculator=True, reset_neighbor=True, ) return n_removed
[docs] def cal_identify_diamond_structure(self) -> None: """ Identify diamond structure atoms. This will generate 'ids' column in self.data: - 0 Other - 1 Cubic diamond - 2 Cubic diamond (1st neighbor) - 3 Cubic diamond (2nd neighbor) - 4 Hexagonal diamond - 5 Hexagonal diamond (1st neighbor) - 6 Hexagonal diamond (2nd neighbor) Notes ----- See :class:`~mdapy.identify_diamond_structure.IdentifyDiamondStructure` for implementation details. """ verlet_list = None safe_L = 15 repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int) for i in range(3): if self.box.boundary[i] == 0: repeat[i] = 1 if sum(repeat) == 3: if hasattr(self, "neighbor_number"): if self.neighbor_number.min() >= 4: tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, 4 ) verlet_list = self.verlet_list box, data = self._get_compute_view() ids = IdentifyDiamondStructure(data, box, verlet_list) ids.compute() self.update_data(self.__data.with_columns(ids=ids.pattern[: self.N]))
[docs] def cal_chill_plus(self, cutoff: float = 3.5) -> None: """ Run the CHILL+ water-phase identification on this system. The result is written to ``self.data['chill_plus']``: - 0 Other - 1 Hexagonal ice - 2 Cubic ice - 3 Interfacial ice - 4 Gas hydrate - 5 Interfacial gas hydrate The system must contain only the molecule centres (e.g. oxygen atoms or coarse-grained water beads); strip hydrogens beforehand. Parameters ---------- cutoff : float, optional O-O cutoff radius in Å. Default 3.5. Notes ----- See :class:`~mdapy.chill_plus.ChillPlus` for implementation details. """ has_neigh = hasattr(self, "rc") and self.rc >= cutoff if not has_neigh: self.build_neighbor(cutoff) box, data = self._get_compute_view() cp = ChillPlus( data, box, cutoff, self.verlet_list, self.distance_list, self.neighbor_number, ) cp.compute() self.update_data( self.__data.with_columns(chill_plus=cp.pattern[: self.N]) )
[docs] def cal_common_neighbor_parameter( self, rc: float, max_neigh: Optional[int] = None ) -> None: """ Calculate common neighbor parameter for structure analysis. This will save results to self.data['cnp']. Parameters ---------- rc : float Cutoff radius for neighbor search. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.common_neighbor_parameter.CommonNeighborParameter` for implementation details. """ has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) box, data = self._get_compute_view() cnp = CommonNeighborParameter( data, box, rc, self.verlet_list, self.distance_list, self.neighbor_number ) cnp.compute() self.update_data(self.__data.with_columns(cnp=cnp.cnp[: self.N]))
[docs] def cal_ackland_jones_analysis(self) -> None: """ Perform Ackland-Jones structure analysis. This will save results to self.data['aja']: - 0 = Other (unknown coordination) - 1 = FCC (face-centered cubic) - 2 = HCP (hexagonal close-packed) - 3 = BCC (body-centered cubic) - 4 = ICO (icosahedral coordination) Notes ----- See :class:`~mdapy.ackland_jones_analysis.AcklandJonesAnalysis` for implementation details. """ N_neigh = 14 if self.data.shape[0] < N_neigh and sum(self.box.boundary) == 0: self.update_data(self.__data.with_columns(pl.lit(0, pl.Int32).alias("aja"))) return if hasattr(self, "neighbor_number") and self.neighbor_number.min() >= N_neigh: tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, N_neigh ) else: self.build_nearest_neighbor(N_neigh) box, data = self._get_compute_view() aja = AcklandJonesAnalysis(data, box, self.verlet_list, self.distance_list) aja.compute() self.update_data(self.__data.with_columns(aja=aja.aja[: self.N]))
[docs] def cal_warren_cowley_parameter( self, rc: float, max_neigh: Optional[int] = None ) -> WarrenCowleyParameter: """ Calculate Warren-Cowley short-range order parameter. Parameters ---------- rc : float Cutoff radius for neighbor search. max_neigh : int, optional Maximum number of neighbors per atom. Returns ------- WarrenCowleyParameter Object containing Warren-Cowley parameters. Notes ----- See :class:`~mdapy.warren_cowley_parameter.WarrenCowleyParameter` for implementation details and returned attributes. Examples -------- >>> wcp = system.cal_warren_cowley_parameter(rc=3.0) >>> print(wcp.wcp) # Access Warren-Cowley parameters """ has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) _, data = self._get_compute_view() wcp = WarrenCowleyParameter(self.verlet_list, self.neighbor_number, data) wcp.compute() return wcp
[docs] def cal_atomic_temperature( self, rc: float, factor: float = 1.0, max_neigh: Optional[int] = None ) -> None: """ Calculate the local atomic temperature for each atom by analyzing the velocity fluctuations of the atom and its neighbors. We assume velocity in units of Å/fs, you can use `factor` to convert your velocity. This will save results to self.data['atomic_temp'] in the unit of K. Parameters ---------- rc : float Cutoff radius for local averaging. factor : float Scaling factor for velocities (e.g., for unit conversion). Default is 1.0. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.atomic_temperature.AtomicTemperature` for implementation details. """ has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) _, data = self._get_compute_view() atomTemp = AtomicTemperature( data, self.verlet_list, self.distance_list, rc, factor ) atomTemp.compute() self.update_data(self.data.with_columns(atomic_temp=atomTemp.T[: self.N]))
[docs] def cal_steinhardt_bond_orientation( self, llist: Union[np.ndarray, List[int]], use_voronoi: bool = False, nnn: int = 0, rc: float = -1.0, average: bool = False, use_weight: bool = False, weight: Optional[np.ndarray] = None, wl: bool = False, wlhat: bool = False, a_face_area_threshold: float = -1, r_face_area_threshold: float = -1, identify_liquid: bool = False, threshold: float = 0.7, n_bond: int = 7, max_neigh: Optional[int] = None, ) -> None: r"""Calculate Steinhardt bond orientation order parameters. One can also use it to identify the solid or liquid atom. Parameters ---------- llist : np.ndarry or List[int] Degrees list, such as [4, 6, 8], should be positive, even integer. use_voronoi : bool If True, compute using Voronoi neighbor. Default is False. nnn : int If `use_voronoi` is False and this value is postive, compute using `nnn` nearest neighbors. Default is 0. rc : float If `use_voronoi` is False and `nnn` is zero, make sure this parameter is postive, and use it to build neighbor. Default is -1.0. average : bool If True, qlm will be averaged. Default is False. use_weight : bool If True, the neighbor will be weighed by `weight` array, otherwise the weight of each neighbor is 1.0. Default is False. weight : np.ndarray, optional If it is not None, it should be has the same shape with the `self.verlet_list`. If it is None and the `use_voronoi` is True, then the voronoi_face_area will be used as weight. Default is None. wl : bool If True, compute third-order invariant :math:`w_l` parameters. Default is False. wlhat : bool If True, compute normalized third-order invariant :math:`\hat{w}_l` parameters. Default is False. a_face_area_threshold : float Absolute face area threshold. Faces with area below this value are ignored as neighbors. Default is -1.0 (no filtering). r_face_area_threshold : float Relative face area threshold (fraction of average face area). Faces with relative area below this value are ignored. Default is -1.0 (no filtering). identify_liquid : bool Enable solid-liquid classification (requires :math:`l=6` in llist). threshold : float Threshold for solid-liquid identification (default: 0.7 for normalized :math:`q_6`). n_bond : int Minimum number of "solid-like" bonds for solid classification. Default to 7. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.steinhardt_bond_orientation.SteinhardtBondOrientation` for implementation details. The results will add to self.data['ql{l}'] for l in `llist`. If set `wl` is True, also will add self.data['wl{l}']. If set `wlhat` is True, also will add self.data['wlh{l}']. If set `identify_liquid`, it will add self.data['solidliquid'] and self.data['nbond'] results. """ if use_voronoi: self.build_voronoi_neighbor(a_face_area_threshold, r_face_area_threshold) verlet_list, distance_list, neighbor_number = ( self.voro_verlet_list, self.voro_distance_list, self.voro_neighbor_number, ) if use_weight and weight is None: weight = self.voro_face_area else: if nnn > 0: has_sort_neigh = False if hasattr(self, "neighbor_number"): if self.neighbor_number.min() >= nnn: tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, nnn, ) has_sort_neigh = True if not has_sort_neigh: self.build_nearest_neighbor(nnn) else: assert rc > 0, ( "At least use voronoi, or set positive nnn, or positive rc." ) if hasattr(self, "rc"): if self.rc < rc: self.build_neighbor(rc, max_neigh) else: self.build_neighbor(rc, max_neigh) verlet_list, distance_list, neighbor_number = ( self.verlet_list, self.distance_list, self.neighbor_number, ) box, data = self._get_compute_view() SBO = SteinhardtBondOrientation( box, data, np.asarray(llist, int), nnn, rc, average, use_voronoi, use_weight, weight, verlet_list, distance_list, neighbor_number, wl, wlhat, identify_liquid, threshold, n_bond, ) SBO.compute() new_data = self.data if SBO.qnarray.shape[1] > 1: columns = [f"ql{i}" for i in llist] if wl: columns.extend(f"wl{i}" for i in llist) if wlhat: columns.extend(f"wlh{i}" for i in llist) for i, name in enumerate(columns): new_data = new_data.with_columns( pl.lit(SBO.qnarray[: self.N, i]).alias(name) ) else: new_data = new_data.with_columns( pl.lit(SBO.qnarray.flatten()[: self.N]).alias(f"ql{llist[0]}") ) if identify_liquid: new_data = new_data.with_columns( pl.lit(SBO.solidliquid[: self.N]).alias("solidliquid"), pl.lit(SBO.nbond[: self.N]).alias("nbond"), ) self.update_data(new_data)
[docs] def cal_polyhedral_template_matching( self, structure="fcc-hcp-bcc", rmsd_threshold=0.1, return_ordering=False, return_rmsd=False, return_atomic_distance=False, return_orientation=False, identify_fcc_planar_faults=False, identify_esf=True, ): """ Identify crystal structures using polyhedral template matching, one can also further distiguish the planar defects in FCC structures. For PTM, structure results will add as self.data['ptm'] and self.data['ordering'] - Structure type (integer, 0=Other, 1=FCC, 2=HCP, 3=BCC, 4=ICO, 5=SC, 6=DCUB, 7=DHEX, 8=Graphene) - Ordering type (interger, 0=Other, 1=L10, 2=L12 (A-site), 3=L12 (B-site), 4=B2, 5=zincblende / wurtzite) For planar defects, structure results will add as self.data['pft'] - 0: Non-hcp atoms (e.g. perfect fcc or disordered) - 1: Indeterminate hcp-like (isolated hcp-like atoms, not forming a planar defect) - 2: Intrinsic stacking fault (ISF, two adjacent hcp-like layers) - 3: Coherent twin boundary (TB, one hcp-like layer) - 4: Multi-layer stacking fault (three or more adjacent hcp-like layers) - 5: Extrinsic Stacking Fault (ESF, if cal_esf=True) Parameters ---------- structure : str String specifying the structure types to consider, separated by hyphens (Defaults is "fcc-hcp-bcc"). Supported values: "fcc", "hcp", "bcc", "ico", "sc", "dcub", "dhex", "graphene". Special values: "all" (all types), "default" (fcc, hcp, bcc). rmsd_threshold : float Maximum RMSD for a valid structure match. Particles exceeding this are classified as "Other". Default is 0.1. return_ordering : bool If True, return the ordering type. Default is False. return_rmsd : bool If True, return the RMSD value. Default is False. return_atomic_distance : bool If True, return the interatomic distance. Default is False. return_orientation : bool If True, return the orientation value. Default is False. identify_fcc_planar_faults : bool If True, further detect the plannar defects. Default is False. identify_esf : bool If True, further defect the ESF. Default is True. Notes ----- See :class:`~mdapy.polyhedral_template_matching.PolyhedralTemplateMatching` and class:`~mdapy.identify_fcc_planar_faults.IdentifyFccPlanarFaults` for implementation details. """ verlet_list = None safe_L = 15 repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int) for i in range(3): if self.box.boundary[i] == 0: repeat[i] = 1 if sum(repeat) == 3: if hasattr(self, "neighbor_number"): if self.neighbor_number.min() >= 18: tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, 18 ) verlet_list = self.verlet_list box, data = self._get_compute_view() ptm = PolyhedralTemplateMatching( structure, data, box, rmsd_threshold, verlet_list ) ptm.compute() output = ptm.output[: self.N] data = self.__data.with_columns(pl.lit(output[:, 0], pl.Int32).alias("ptm")) if return_ordering: data = data.with_columns(pl.lit(output[:, 1]).alias("ordering")) if return_rmsd: data = data.with_columns(pl.lit(output[:, 2]).alias("rmsd")) if return_atomic_distance: data = data.with_columns(pl.lit(output[:, 3]).alias("interatomic_distance")) if return_orientation: data = data.with_columns( qx=output[:, 5], qy=output[:, 6], qz=output[:, 7], qw=output[:, 4], ) if identify_fcc_planar_faults: structure_types = np.array(ptm.output[:, 0], np.int32) ptm_indices = np.ascontiguousarray(ptm.ptm_indices[:, 1:13]) ifpt = IdentifyFccPlanarFaults(structure_types, ptm_indices, identify_esf) ifpt.compute() data = data.with_columns(pl.lit(ifpt.fault_types[: self.N]).alias("pft")) self.update_data(data)
[docs] def cal_centro_symmetry_parameter(self, N: int): """ Calculate centro-symmetry parameter for defect identification. The results will be added as self.data['csp']. Parameters ---------- N : int Number of nearest neighbors to consider, should be a positive, even integer. 12 for FCC and 8 for BCC. Notes ----- See :class:`~mdapy.centro_symmetry_parameter.CentroSymmetryParameter` for implementation details. """ assert N % 2 == 0 and N > 0, f"N must be a positive even number: {N}." if self.N <= N and sum(self.box.boundary) == 0: res = np.full(self.N, 10000, float) else: has_verlet = False if hasattr(self, "neighbor_number"): if self.neighbor_number.min() >= N and hasattr(self, "rc"): tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, N ) has_verlet = True if not has_verlet: self.build_nearest_neighbor(N) box, data = self._get_compute_view() csp = CentroSymmetryParameter(data, box, N, self.verlet_list) csp.compute() res = csp.csp[: self.N] self.update_data(self.data.with_columns(csp=res))
[docs] def cal_common_neighbor_analysis( self, rc: Optional[float] = None, max_neigh: Optional[int] = None ): """ Perform common neighbor analysis for structure identification. This will generate 'cna' column in self.data: - 0 = Other (unknown coordination) - 1 = FCC (face-centered cubic) - 2 = HCP (hexagonal close-packed) - 3 = BCC (body-centered cubic) - 4 = ICO (icosahedral coordination) Parameters ---------- rc : float, optional Cutoff radius. If not given, it will use adaptive mode to do CNA. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.common_neighbor_analysis.CommonNeighborAnalysis` for implementation details. """ verlet_list = None neighbor_number = None safe_L = 15 repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int) for i in range(3): if self.box.boundary[i] == 0: repeat[i] = 1 if sum(repeat) == 3: if hasattr(self, "rc"): if rc is None: if self.neighbor_number.min() >= 14: tool.sort_neighbor( self.verlet_list, self.distance_list, self.neighbor_number, 14, ) verlet_list = self.verlet_list else: if self.rc < rc: self.build_neighbor(rc, max_neigh) verlet_list = self.verlet_list neighbor_number = self.neighbor_number else: if rc is not None: self.build_neighbor(rc, max_neigh) verlet_list = self.verlet_list neighbor_number = self.neighbor_number box, data = self._get_compute_view() cna = CommonNeighborAnalysis(data, box, verlet_list, neighbor_number, rc) cna.compute() self.update_data(self.__data.with_columns(cna=cna.pattern[: self.N]))
[docs] def cal_structure_factor( self, k_min: float, k_max: float, nbins: int, cal_partial: bool = False, atomic_form_factors: bool = False, mode: str = "direct", rc: Optional[float] = None, nbin_rdf: int = 200, window: bool = False, ) -> StructureFactor: """ Calculate static structure factor S(k). Parameters ---------- k_min : float Minimum wave vector magnitude. k_max : float Maximum wave vector magnitude. nbins : int Number of bins for k-space averaging. cal_partial : bool, optional If True, calculate partial structure factors. Default is False. atomic_form_factors : bool, default=False If True, use atomic form factors f to weigh the atoms' individual contributions to S(k). Atomic form factors are taken from `TU Graz <https://lampz.tugraz.at/~hadley/ss1/crystaldiffraction/atomicformfactors/formfactors.php>`_. mode : str, optional Calculation mode: 'direct', 'debye', or 'rdf'. Default is 'direct'. rc : float, optional Cutoff for the RDF intermediate (only used by ``mode='rdf'``). Defaults to half of the smallest periodic box thickness. nbin_rdf : int, default=200 Number of radial bins for the RDF (only used by ``mode='rdf'``). window : bool, default=True Apply the Lorch window when transforming g(r) → S(k) (only used by ``mode='rdf'``). Returns ------- StructureFactor Object containing structure factor data. Notes ----- See :class:`~mdapy.structure_factor.StructureFactor` for implementation details and returned attributes. """ sfc = StructureFactor( self.data, self.box, k_min, k_max, nbins, cal_partial, atomic_form_factors, mode, rc=rc, nbin_rdf=nbin_rdf, window=window, ) sfc.compute() return sfc
[docs] def cal_bond_analysis( self, rc: float, nbin: int, max_neigh: Optional[int] = None ) -> BondAnalysis: """ Analyze bond length and angle distributions. Parameters ---------- rc : float Cutoff radius for bond search. nbin : int Number of bins for histograms. max_neigh : int, optional Maximum number of neighbors per atom. Returns ------- BondAnalysis Object containing bond distribution data. Notes ----- See :class:`~mdapy.bond_analysis.BondAnalysis` for implementation details and returned attributes. Examples -------- >>> ba = system.cal_bond_analysis(rc=5.0, nbin=100) >>> print(ba.bond_length_distribution) """ has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) box, data = self._get_compute_view() ba = BondAnalysis( data, box, rc, nbin, self.verlet_list, self.distance_list, self.neighbor_number, ) ba.compute() return ba
[docs] def cal_angular_distribution_function( self, rc_dict: Dict[str, List[float]], nbin: int, max_neigh: Optional[int] = None, ) -> AngularDistributionFunction: """ Calculate angular distribution function (ADF). Parameters ---------- rc_dict : dict of str to list of float Dictionary mapping triplet patterns to cutoff radii. Format: {'A-B-C': [rc_AB_min, rc_AB_max, rc_AC_min, rc_AC_max]} where A, B, C are element symbols and A is the central atom. Example: {'O-H-H': [0, 2.0, 0, 2.0]} for water molecules. nbin : int Number of angular bins. max_neigh : int, optional Maximum number of neighbors per atom. Returns ------- AngularDistributionFunction Object containing ADF data. Notes ----- See :class:`~mdapy.angular_distribution_function.AngularDistributionFunction` for implementation details and returned attributes. """ assert "element" in self.data.columns rc = np.array(list(rc_dict.values())).max() has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) box, data = self._get_compute_view() adf = AngularDistributionFunction( data, box, rc_dict, nbin, self.verlet_list, self.distance_list, self.neighbor_number, ) adf.compute() return adf
[docs] def cal_radial_distribution_function( self, rc: float, nbin: int = 100, max_neigh: Optional[int] = None, streaming: Optional[bool] = None, ) -> RadialDistributionFunction: """ Calculate radial distribution function g(r). Parameters ---------- rc : float Maximum distance for RDF calculation. nbin : int, optional Number of distance bins. Default is 100. max_neigh : int, optional Maximum number of neighbors per atom (only used by the Verlet path). streaming : bool, optional Selects the kernel: * ``None`` (default): auto-pick — streaming when ``rc`` is large relative to the box (``2*rc`` exceeds the smallest periodic thickness divided by 3, i.e. the cutoff that makes the Verlet list bloat up); Verlet path otherwise. * ``True``: force the streaming kernel (no neighbour list built). * ``False``: force the Verlet path (build/reuse a neighbour list). Returns ------- RadialDistributionFunction Object containing RDF data and partial RDFs if applicable. Notes ----- See :class:`~mdapy.radial_distribution_function.RadialDistributionFunction` for implementation details and returned attributes. """ box, data = self._get_compute_view() # Auto-pick streaming when rc is so large that a Verlet list would # store ~all atoms per atom. Threshold: rc above 1/3 of the smallest # periodic thickness — at that point the cell list degenerates anyway # so the streaming kernel's memory savings are the deciding factor. if streaming is None: try: thickness = box.get_thickness() except AttributeError: thickness = np.linalg.norm(box.box, axis=1) periodic_thicknesses = [ thickness[i] for i in range(3) if box.boundary[i] ] min_thick = min(periodic_thicknesses) if periodic_thicknesses else float("inf") streaming = rc >= min_thick / 3.0 # Pick species labels for the partial-RDF dict keys: prefer chemical # ``element`` strings (so g_partial keys read as ``("Al", "Cu")``); # fall back to integer ``type`` ids; otherwise everyone is species 0. def _species_labels(view): if "element" in view.columns: return view["element"].to_numpy() if "type" in view.columns: return view["type"].to_numpy() return np.zeros(view.shape[0], np.int32) type_list = _species_labels(data) if streaming: # Match the legacy Verlet path's replication semantics: when the # original box is too thin for the requested rc (rc > L/2 along # any periodic axis), replicate just enough that rc fits the # min-image convention in the enlarged cell. Streaming on the # enlarged cell is still cheap because we only carry positions, # not a Verlet list. repeat = self.box.check_small_box(rc) if sum(repeat) != 3: from mdapy.tool_function import replicate rep_data, rep_box = replicate(data, box, *repeat) rep_x = rep_data["x"].to_numpy() rep_y = rep_data["y"].to_numpy() rep_z = rep_data["z"].to_numpy() rep_types = _species_labels(rep_data) rdf = RadialDistributionFunction( rc, nbin, rep_box, type_list=rep_types, streaming=True, x=rep_x, y=rep_y, z=rep_z, ) else: rdf = RadialDistributionFunction( rc, nbin, box, type_list=type_list, streaming=True, x=data["x"].to_numpy(), y=data["y"].to_numpy(), z=data["z"].to_numpy(), ) else: has_neigh = False if hasattr(self, "rc"): if self.rc >= rc: has_neigh = True if not has_neigh: self.build_neighbor(rc, max_neigh) # Refresh box/data: build_neighbor may have replicated the system # into _enlarge_box/_enlarge_data; the verlet list now indexes # into the enlarged frame, so type_list must come from there too. box, data = self._get_compute_view() type_list = _species_labels(data) rdf = RadialDistributionFunction( rc, nbin, box, verlet_list=self.verlet_list, distance_list=self.distance_list, neighbor_number=self.neighbor_number, type_list=type_list, ) rdf.compute() return rdf
[docs] def average_by_neighbor( self, average_rc: float, property_name: str, include_self: bool = True, output_name: Optional[str] = None, max_neigh: Optional[int] = None, ) -> None: """ Average a property over local neighborhoods. Parameters ---------- average_rc : float Cutoff radius for local averaging. property_name : str Name of the column in data to average. include_self : bool Whether to include the central atom in averaging. Default is True. output_name : str, optional Name for the output column. If None, uses '{property_name}_ave'. max_neigh : int, optional Maximum number of neighbors per atom. Raises ------ AssertionError If property_name is not a column in data. """ assert property_name in self.__data.columns, f"{property_name} not in data." if hasattr(self, "rc"): if self.rc < average_rc: self.build_neighbor(average_rc, max_neigh) else: self.build_neighbor(average_rc, max_neigh) assert not hasattr(self, "_enlarge_data"), ( "average_by_neighbor only supports systems whose box is large enough " "that no replica was built (i.e. self._enlarge_data must not exist)." ) data = self.data data = tool.average_by_neighbor( average_rc, data, property_name, self.verlet_list, self.distance_list, self.neighbor_number, include_self, output_name, )[: self.N].rechunk() self.update_data(data)
[docs] def cal_cluster_analysis( self, rc: Union[float, int, Dict[str, float]] = 5.0, max_neigh: Optional[int] = None, ) -> None: """ Perform cluster analysis to identify connected atomic groups. The results will be added as self.data['cluster_id']. Parameters ---------- rc : float, int, or dict, optional Cutoff radius for cluster connectivity. Can be: * **float/int**: Single cutoff for all atom pairs * **dict**: Type-specific cutoffs, e.g., {'1-1': 1.5, '1-2': 1.3} Default is 5.0. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.cluster_analysis.ClusterAnalysis` for implementation details. Examples -------- Single cutoff: >>> system.cal_cluster_analysis(rc=3.0) Type-specific cutoffs: >>> rc_dict = {"1-1": 2.8, "1-2": 3.0, "2-2": 3.2} >>> system.cal_cluster_analysis(rc=rc_dict) >>> # cluster_id column added to system.data """ if isinstance(rc, (int, float, np.integer, np.floating)): max_rc = float(rc) elif isinstance(rc, dict): max_rc = max([i for i in rc.values()]) else: raise TypeError( "rc should be a positive number, or a dict like {'1-1':1.5, '1-2':1.3}" ) if hasattr(self, "rc"): if self.rc < max_rc: self.build_neighbor(max_rc, max_neigh) else: self.build_neighbor(max_rc, max_neigh) type_list = None if isinstance(rc, dict): assert "type" in self.data.columns, ( "Must have type for multi rc cluster calculation." ) type_list = self.data["type"].to_numpy(allow_copy=False) ca = ClusterAnalysis( rc, self.verlet_list, self.distance_list, self.neighbor_number, type_list ) ca.compute() self.update_data( self.data.with_columns(cluster_id=ca.particleClusters[: self.N]) )
[docs] def cal_structure_entropy( self, rc: float, sigma: float, use_local_density: bool = False, average_rc: float = 0.0, max_neigh: Optional[int] = None, ) -> None: """ Calculate structural entropy for disorder quantification. Parameters ---------- rc : float Cutoff radius for neighbor search. sigma : float Width parameter for Gaussian kernel. use_local_density : bool, optional If True, use local density normalization. Default is False. average_rc : float, optional Radius for local averaging of entropy. If 0, no averaging. Default is 0.0. max_neigh : int, optional Maximum number of neighbors per atom. Notes ----- See :class:`~mdapy.structure_entropy.StructureEntropy` for implementation details. Examples -------- >>> system.cal_structure_entropy(rc=5.0, sigma=0.5) >>> # entropy column added to system.data With local averaging: >>> system.cal_structure_entropy(rc=5.0, sigma=0.5, average_rc=3.0) >>> # entropy and entropy_ave columns added to system.data """ if hasattr(self, "rc"): if self.rc < rc: self.build_neighbor(rc, max_neigh) else: self.build_neighbor(rc, max_neigh) box, _ = self._get_compute_view() SE = StructureEntropy( box, self.verlet_list, self.distance_list, self.neighbor_number, rc, sigma, use_local_density, average_rc, ) SE.compute() data = self.data.with_columns(entropy=SE.entropy[: self.N]) if average_rc > 0: data = data.with_columns(entropy_ave=SE.entropy_ave[: self.N]) self.update_data(data)
[docs] def cal_voronoi_volume(self) -> None: """ Calculate Voronoi cell volumes and related properties. This method computes the Voronoi tessellation and adds three columns to the data DataFrame: * **volume**: Atomic volume from Voronoi cell * **neighbor_number**: Coordination number from Voronoi neighbors * **cavity_radius**: This is the distance from the atom to the farthest vertex of its Voronoi cell, representing the radius of the largest empty sphere (containing no other atoms) that touches the atom. Notes ----- See :class:`~mdapy.voronoi.Voronoi` for implementation details. Examples -------- >>> system.cal_voronoi_volume() >>> # volume, neighbor_number, cavity_radius columns added to data >>> avg_volume = system.data["volume"].mean() """ vor = Voronoi(self.box, self.data) volume, neighbor_number, cavity_radius = vor.get_volume() self.update_data( self.data.with_columns( volume=volume, neighbor_number=neighbor_number, cavity_radius=cavity_radius, ) )
[docs] def cal_chemical_species( self, search_species: Optional[List[str]] = None, element_list: Optional[List[str]] = None, check_most: int = 10, add_mol_id: bool = False, scale: float = 0.6, ) -> Dict[str, int]: """ Identify chemical species based on atomic connectivity. Two atoms *i* and *j* are considered bonded if their interatomic distance satisfies: rij <= (vdW_radius_i + vdW_radius_j) * scale where vdW_radius is the van der Waals radius of the corresponding element, and ``scale`` is an empirical scaling factor. This approach is similar to the connectivity detection method used in OpenBabel. Parameters ---------- search_species : list of str, optional Molecular formulas to search for, e.g. ``['H2O', 'CO2', 'Cl2', 'N2']``. If provided, only these species will be counted. Otherwise, the most frequent species will be returned. element_list : list of str, optional List of element symbols corresponding to atom types, e.g. ``['C', 'H', 'O']``. This is required if the ``'element'`` column is not present in ``self.data``. check_most : int, default=10 Number of most frequent species to return when ``search_species`` is None. add_mol_id : bool, default=False If True and ``search_species`` is provided, add a new column ``'mol_id'`` to the internal data. The molecule IDs are assigned according to the order of ``search_species`` (zero-based index). Atoms not belonging to any searched species are assigned ``-1``. scale : float, default=0.6 Scaling factor applied to the sum of van der Waals radii when determining bonding cutoff distances. Returns ------- dict of str to int Dictionary mapping species formulas to their corresponding counts. Examples -------- >>> species = system.cal_chemical_species(["H2O", "CO2"]) >>> print(species) {'H2O': 125, 'CO2': 42} """ import re if "element" in self.data.columns: element_list = self.data["element"].unique().sort() ele2type = {j: i + 1 for i, j in enumerate(element_list)} self.update_data( self.data.with_columns( pl.col("element").replace_strict(ele2type).rechunk().alias("type") ) ) else: assert element_list is not None, ( "'element_list' must be provided because no 'element' column " "is present in the data." ) assert "type" in self.data.columns, ( "'type' column is required when 'element' column is not available." ) Ntype = self.data["type"].max() assert len(element_list) >= Ntype, ( f"'element_list' must contain at least {Ntype} elements to match " f"the maximum atom type index ({Ntype})." ) partial_cutoff = {} for type1 in range(len(element_list)): for type2 in range(type1, len(element_list)): partial_cutoff[f"{type1 + 1}-{type2 + 1}"] = ( vdw_radii[atomic_numbers[element_list[type1]]] + vdw_radii[atomic_numbers[element_list[type2]]] ) * scale self.cal_cluster_analysis(partial_cutoff, max_neigh=None) if search_species is not None: pattern = r"([A-Z][a-z]?)(\d*)" trans_search_species = [] for old_name in search_species: matches = re.findall(pattern, old_name) matches.sort() name = "" for i, j in matches: if len(j): name += i * int(j) else: name += i trans_search_species.append(name) first_step = ( self.__data.with_columns( pl.lit( np.array(element_list)[(self.__data["type"] - 1).to_numpy()] ).alias("type_name") ) .group_by("cluster_id") .agg(pl.col("type_name")) .with_columns(pl.col("type_name").list.sort()) .with_columns(pl.col("type_name").list.join("")) ) res = (first_step["type_name"].value_counts()).filter( pl.col("type_name").is_in(trans_search_species) ) res = dict(zip(res[:, 0], res[:, 1])) species = {} for i, j in zip(search_species, trans_search_species): if j not in res.keys(): species[i] = 0 else: species[i] = res[j] if add_mol_id: clus_mol = first_step.with_columns( pl.col("type_name") .replace_strict( {j: i for i, j in enumerate(trans_search_species)}, default=-1 ) .rechunk() .alias("mol_id") ) self.update_data( self.data.with_columns( pl.col("cluster_id") .replace_strict( dict(zip(clus_mol["cluster_id"], clus_mol["mol_id"])) ) .rechunk() .alias("mol_id") ) ) else: res = ( ( self.__data.with_columns( pl.lit( np.array(element_list)[(self.__data["type"] - 1).to_numpy()] ).alias("type_name") ) .group_by("cluster_id") .agg(pl.col("type_name")) .with_columns(pl.col("type_name").list.sort()) .with_columns(pl.col("type_name").list.join(""))["type_name"] .value_counts() ) .sort("count", descending=True) .limit(check_most) ) species = dict(zip(res[:, 0], res[:, 1])) return species
if __name__ == "__main__": system = System("tests/input_files/water.xyz") # res = system.cal_chemical_species(search_species=['H2O'], add_mol_id=True) res = system.cal_chemical_species(scale=0.4) print(res) # system.write_xyz('mol.xyz')