Source code for mdapy.tool_function

# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.

from mdapy import _neighbor, _repeat_cell, _split
from mdapy.parallel import get_num_threads
from typing import Optional, Tuple
import numpy as np
import polars as pl
from mdapy.box import Box
from mdapy.data import atomic_masses, atomic_numbers
import os


[docs] def average_by_neighbor( average_rc: float, data: pl.DataFrame, property_name: str, verlet_list: np.ndarray, distance_list: np.ndarray, neighbor_number: np.ndarray, include_self: bool = True, output_name: Optional[str] = None, ) -> pl.DataFrame: """ Compute the averaged property of each atom based on its neighbors within a given cutoff radius. This function calculates the local average of a specified property (e.g., temperature, stress) for each atom by averaging over its neighboring atoms within the cutoff ``average_rc``. The neighbor information is provided through the Verlet list and corresponding distances. Parameters ---------- average_rc : float The cutoff radius used for averaging the property. data : pl.DataFrame The input atomic data containing the property to be averaged. property_name : str The name of the column in ``data`` to average. verlet_list : np.ndarray A 2D array of neighbor indices. Each row corresponds to one atom and lists its neighbors. distance_list : np.ndarray A 2D array of distances corresponding to the ``verlet_list`` entries. neighbor_number : np.ndarray A 1D array indicating the actual number of neighbors for each atom. include_self : bool, optional, default=True Whether to include the atom itself when computing the average. output_name : str, optional The name of the output column to store the averaged property. If not provided, defaults to ``f"{property_name}_ave"``. Returns ------- pl.DataFrame A new DataFrame containing the original data plus an additional column with the averaged property values. """ assert property_name in data.columns, f"{property_name} not in data." property_ave = np.zeros(data.shape[0]) _neighbor.average_by_neighbor( average_rc, verlet_list, distance_list, neighbor_number, data[property_name].to_numpy(allow_copy=False), property_ave, include_self, get_num_threads(), ) if output_name is None: output_name = f"{property_name}_ave" return data.with_columns(pl.lit(property_ave).alias(output_name))
[docs] def sort_neighbor( verlet_list: np.ndarray, distance_list: np.ndarray, neighbor_number: np.ndarray, k: int, ): """ Sort the first ``k`` neighbors of each atom by ascending distance. This function reorders the first ``k`` neighbor entries in the Verlet list and distance list so that the nearest neighbors appear first. Parameters ---------- verlet_list : np.ndarray A 2D array of neighbor indices. Each row corresponds to one atom and lists its neighbors. distance_list : np.ndarray A 2D array of distances corresponding to the ``verlet_list`` entries. neighbor_number : np.ndarray A 1D array indicating the actual number of neighbors for each atom. k : int The number of nearest neighbors to sort. Must be less than or equal to the minimum value in ``neighbor_number``. Raises ------ AssertionError If any atom has fewer than ``k`` neighbors. Examples -------- >>> verlet = np.array([[3, 1, 2], [0, 2, 3]]) >>> distance = np.array([[0.3, 0.1, 0.2], [0.5, 0.2, 0.4]]) >>> neighbor_num = np.array([3, 3]) >>> sort_neighbor(verlet, distance, neighbor_num, k=2) >>> verlet array([[1, 2, 3], [2, 3, 0]]) >>> distance array([[0.1, 0.2, 0.3], [0.2, 0.4, 0.5]]) """ minNumber = neighbor_number.min() assert minNumber >= k, f"The min neighbor number {minNumber} is lower than k {k}." _neighbor.sort_verlet_by_distance(verlet_list, distance_list, k, get_num_threads())
[docs] def wrap_pos(data: pl.DataFrame, box: Box) -> pl.DataFrame: """Wrap position into box based on PBC. Args: data (pl.DataFrame): atom information. box (Box): box information Returns: pl.DataFrame: DataFrame with new wraped position. """ x, y, z = ( data["x"].to_numpy(writable=True), data["y"].to_numpy(writable=True), data["z"].to_numpy(writable=True), ) _neighbor.wrap_positions(x, y, z, box.box, box.origin, box.boundary, get_num_threads()) return data.with_columns(x=x, y=y, z=z)
[docs] def replicate( data: pl.DataFrame, box: Box, nx: int, ny: int, nz: int ) -> Tuple[pl.DataFrame, Box]: """Replicates atomic data along the x, y, and z directions. This function creates a supercell by replicating the input atomic positions and associated data `nx` times along x, `ny` times along y, and `nz` times along z. The input DataFrame is expected to contain at least 'x', 'y', and 'z' columns for atomic coordinates. Args: data: Atomic information, including positions in 'x', 'y', 'z' columns. box: Simulation box information to be scaled. nx: Number of replications along the x direction. ny: Number of replications along the y direction. nz: Number of replications along the z direction. Returns: A tuple containing the replicated DataFrame (with updated positions and optionally renumbered 'id' column) and the scaled Box object. """ old_box = box.box old_pos = data.select("x", "y", "z").to_numpy() n_old = old_pos.shape[0] total = n_old * nx * ny * nz * 3 new_pos = np.zeros(total, dtype=np.float64) _repeat_cell.repeat_cell(new_pos, old_box, old_pos, nx, ny, nz, get_num_threads()) new_pos = new_pos.reshape((-1, 3)) new_box = old_box * np.array([nx, ny, nz]).reshape((3, 1)) new_data = pl.concat([data] * nx * ny * nz).with_columns( x=new_pos[:, 0], y=new_pos[:, 1], z=new_pos[:, 2] ) if "id" in new_data.columns: new_data = new_data.select(pl.all().exclude("id")).with_row_index( "id", offset=1 ) return new_data.rechunk(), Box(new_box, box.boundary, box.origin)
def _replicate_pos( data: pl.DataFrame, box: Box, nx: int, ny: int, nz: int ) -> Tuple[pl.DataFrame, Box]: old_box = box.box old_pos = data.select("x", "y", "z").to_numpy() n_old = old_pos.shape[0] total = n_old * nx * ny * nz * 3 new_pos = np.zeros(total, dtype=np.float64) _repeat_cell.repeat_cell(new_pos, old_box, old_pos, nx, ny, nz, get_num_threads()) new_pos = new_pos.reshape((-1, 3)) new_box = old_box * np.array([nx, ny, nz]).reshape((3, 1)) new_data = pl.from_numpy(new_pos, schema=["x", "y", "z"]) return new_data.rechunk(), Box(new_box, box.boundary, box.origin) def _set_pka( data: pl.DataFrame, box: Box, energy: float, direction: np.ndarray, index: int = None, element: str = None, ) -> pl.DataFrame: required_cols = ["x", "y", "z", "element", "vx", "vy", "vz"] for col in required_cols: if col not in data.columns: raise ValueError(f"Must include '{col}' column in data.") direction = np.array(direction, dtype=float) if direction.shape != (3,): raise ValueError("Direction must be a 3D vector.") element_unique = data["element"].unique() has_amass = True if "amass" not in data.columns: ele2mass = {} for ele in element_unique: if ele not in atomic_numbers.keys(): raise ValueError(f"Unknown element '{ele}' in atomic_numbers.") ele2mass[ele] = atomic_masses[atomic_numbers[ele]] data = data.with_columns( pl.col("element").replace_strict(ele2mass).rechunk().alias("amass") ) has_amass = False totalmass = data["amass"].sum() if index is None: cx, cy, cz = box.box @ np.array([0.5, 0.5, 0.5]) + box.origin dist_expr: pl.Expr = ( (pl.col("x") - cx) ** 2 + (pl.col("y") - cy) ** 2 + (pl.col("z") - cz) ** 2 ) if element is None: index = data.select(dist_expr.arg_min()).item() else: if element not in element_unique: raise ValueError(f"Element '{element}' not in data.") ele_filtered = data.with_row_index("row_idx").filter( pl.col("element") == element ) local_min = ele_filtered.select(dist_expr.arg_min()).item() index = ele_filtered["row_idx"][local_min] else: if index < 0 or index >= len(data): raise ValueError(f"Index {index} out of bounds.") if element is not None and data.select(pl.col("element"))[index, 0] != element: raise ValueError(f"Element at index {index} is not '{element}'.") atom_mass = data["amass"][index] speed = np.sqrt(2 * energy / atom_mass) norm_dir = direction / np.linalg.norm(direction) # We make sure the input velocity in the units of A/fs. newv = speed * norm_dir / 10.18051 # eV/amu to A/fs data[index, "vx"] = newv[0] data[index, "vy"] = newv[1] data[index, "vz"] = newv[2] com_vx = data.select((pl.col("amass") * pl.col("vx")).sum() / totalmass).item() com_vy = data.select((pl.col("amass") * pl.col("vy")).sum() / totalmass).item() com_vz = data.select((pl.col("amass") * pl.col("vz")).sum() / totalmass).item() data = data.with_columns( pl.col("vx") - com_vx, pl.col("vy") - com_vy, pl.col("vz") - com_vz, ) if not has_amass: data = data.drop("amass") return data
[docs] def split_xyz(input_file, output_dir="res", output_prefix=None, in_memory=True): """ Split a multi-frame XYZ file into individual frame files. Parameters ---------- input_file : str Path to the input XYZ file containing multiple frames. The file should follow standard XYZ format: - Line 1: Number of atoms (integer) - Line 2: Comment line - Lines 3 to N+2: Atomic coordinates (element x y z ...) - Repeat for each frame output_dir : str, optional Directory where individual frame files will be saved. The directory will be created if it doesn't exist. Default is "res". output_prefix : str, optional Prefix for output filenames. If None, uses the input filename (without extension) as prefix. Output files are named as: {output_prefix}.{frame:0Nd}.xyz where N is the number of digits needed (e.g., 5 digits for 10000 frames). Default is None. in_memory : bool, optional If input file is too big to load into memory, set this parameter to False. Default is True. Returns ------- None Examples -------- >>> # Split trajectory.xyz into res/trajectory.00000.xyz, res/trajectory.00001.xyz, ... >>> split_xyz("trajectory.xyz") >>> # Specify custom output directory and prefix >>> split_xyz("trajectory.xyz", output_dir="frames", output_prefix="md") >>> # Output: frames/md.00000.xyz, frames/md.00001.xyz, ... >>> # For a file with 100000 frames, output will use 6 digits >>> split_xyz("large_traj.xyz") >>> # Output: res/large_traj.000000.xyz, ..., res/large_traj.099999.xyz Raises ------ RuntimeError If the input file cannot be opened or contains no valid frames. OSError If the output directory cannot be created. """ if output_prefix is None: output_prefix = os.path.splitext(os.path.basename(input_file))[0] if in_memory: _split.split_xyz(input_file, output_dir, output_prefix, get_num_threads()) else: with open(input_file, "r") as file: frame = 0 while True: line = file.readline() if not line: break n = int(line.strip()) output_file = os.path.join( output_dir, f"{output_prefix}.{frame:0>6d}.xyz" ) with open(output_file, "w") as out_file: out_file.write(line) for _ in range(n + 1): out_file.write(file.readline()) frame += 1
[docs] def generate_velocity(N, mass, temperature, remove_com=True, seed=None): """ Generate velocities from Maxwell-Boltzmann distribution at given temperature. Parameters ---------- N : int Number of atoms. mass : float or array_like Atomic mass in g/mol. Can be a scalar (all atoms same mass) or array of length N (different masses). temperature : float Target temperature in Kelvin. remove_com : bool, optional If True, remove center-of-mass velocity to ensure zero total momentum. Default is True. seed : int, optional Random seed for reproducibility. Default is None. Returns ------- vel : ndarray, shape (N, 3) Velocities in Å/fs. Notes ----- The velocity is generated from Maxwell-Boltzmann distribution: P(v) ∝ exp(-m*v²/(2*kb*T)) The standard deviation of velocity component is: σ_v = sqrt(kb*T/m) Units: - Temperature: K - Mass: g/mol - Velocity: Å/fs - kb = 1.380649e-23 J/K """ if seed is not None: np.random.seed(seed) # Convert mass to array if scalar mass = np.atleast_1d(mass) if mass.size == 1: mass = np.full(N, mass[0]) elif mass.size != N: raise ValueError(f"Mass array size {mass.size} doesn't match N={N}") # Physical constants kb = 1.380649e-23 # Boltzmann constant (J/K) afu = 6.022140857e23 # Avogadro's number (1/mol) # Convert mass from g/mol to kg mass_kg = mass / (afu * 1000.0) # kg # Standard deviation of velocity (m/s) # sigma_v = sqrt(kb*T/m) sigma_v = np.sqrt(kb * temperature / mass_kg) # m/s # Convert to Å/fs: 1 m/s = 1e-10 m / 1e-15 s = 1e-5 Å/fs sigma_v_Aps = sigma_v * 1e-5 # Å/fs # Generate velocities from normal distribution # Each component (vx, vy, vz) follows N(0, sigma_v²) vel = np.random.normal(0, sigma_v_Aps[:, np.newaxis], size=(N, 3)) # Remove center-of-mass velocity if remove_com: total_momentum = np.sum(vel * mass[:, np.newaxis], axis=0) total_mass = np.sum(mass) v_com = total_momentum / total_mass vel -= v_com return vel