# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
from __future__ import annotations
from typing import Tuple, Union, Optional, Iterable, Dict, TYPE_CHECKING, Any, List
import numpy as np
import polars as pl
import os
import tempfile
from pathlib import Path
from mdapy.box import Box
from mdapy.data import atomic_numbers, atomic_masses
import io
import re
import gzip
from mdapy.pigz import compress_file
if TYPE_CHECKING:
from ase import Atoms
from ovito.data import DataCollection
def _open_file(filename: str, mode: str = "r"):
"""
Smart file opener that handles both regular and gzip files.
Args:
filename: Path to file
mode: File mode ('r', 'rb', 'w', 'wb')
Returns:
File handle (automatically detects .gz)
"""
if filename.endswith(".gz"):
if "b" not in mode:
mode = mode + "t" # text mode for gzip
return gzip.open(filename, mode)
else:
return open(filename, mode)
def _is_uniform_single_space(lines: List[str], expected_ncols: int,
sample: int = 32) -> bool:
"""Return True iff every sampled line is exactly single-space-separated
with `expected_ncols` fields (no leading/trailing whitespace, no tabs,
no runs of multiple spaces).
Used as a cheap pre-flight before falling back to `pl.read_csv` (fast
path). When this returns False, callers should parse the block in
Python with `str.split()` (which handles arbitrary whitespace).
"""
if not lines:
return False
n = min(len(lines), sample)
for i in range(n):
raw = lines[i].rstrip("\r\n")
toks = raw.split()
if len(toks) != expected_ncols:
return False
# Reject tab, leading/trailing space, runs of multi-space.
if raw != " ".join(toks):
return False
return True
def _parse_dump_frame_impl(lines: List[str], source: str
) -> Tuple[pl.DataFrame, Box, Dict[str, Any]]:
"""Parse one LAMMPS dump frame from `lines` (the 9 header lines + N
atom rows). Pulled out of `BuildSystem.read_dump` so the multi-frame
Trajectory reader can reuse the exact same parsing logic.
"""
if len(lines) < 9:
raise ValueError(f"{source}: dump frame has only {len(lines)} lines (<9)")
try:
timestep = int(lines[1].strip())
except (IndexError, ValueError):
raise ValueError(f"{source}: malformed ITEM: TIMESTEP value")
try:
n_atoms = int(lines[3].strip())
except (IndexError, ValueError):
raise ValueError(f"{source}: malformed ITEM: NUMBER OF ATOMS value")
bb_line = lines[4].strip()
if not bb_line.startswith("ITEM: BOX BOUNDS"):
raise ValueError(f"{source}: expected 'ITEM: BOX BOUNDS' on line 5")
bb_tokens = bb_line.split()[3:]
if bb_tokens and all(t in {"pp", "ff", "ss", "mm"} for t in bb_tokens[-3:]):
boundary = [1 if t == "pp" else 0 for t in bb_tokens[-3:]]
geometry_tokens = bb_tokens[:-3]
else:
boundary = [1, 1, 1]
geometry_tokens = bb_tokens
bb_rows = [lines[5].split(), lines[6].split(), lines[7].split()]
if "abc" in geometry_tokens and "origin" in geometry_tokens:
avec = np.array(bb_rows[0][:3], dtype=np.float64)
bvec = np.array(bb_rows[1][:3], dtype=np.float64)
cvec = np.array(bb_rows[2][:3], dtype=np.float64)
origin = np.array(
[bb_rows[0][3], bb_rows[1][3], bb_rows[2][3]],
dtype=np.float64,
)
box = np.vstack([avec, bvec, cvec, origin])
elif "xy" in geometry_tokens and "xz" in geometry_tokens and "yz" in geometry_tokens:
xlo_b, xhi_b, xy = (float(bb_rows[0][0]), float(bb_rows[0][1]),
float(bb_rows[0][2]))
ylo_b, yhi_b, xz = (float(bb_rows[1][0]), float(bb_rows[1][1]),
float(bb_rows[1][2]))
zlo_b, zhi_b, yz = (float(bb_rows[2][0]), float(bb_rows[2][1]),
float(bb_rows[2][2]))
xlo = xlo_b - min(0.0, xy, xz, xy + xz)
xhi = xhi_b - max(0.0, xy, xz, xy + xz)
ylo = ylo_b - min(0.0, yz)
yhi = yhi_b - max(0.0, yz)
zlo, zhi = zlo_b, zhi_b
box = np.array([
[xhi - xlo, 0, 0 ],
[xy, yhi - ylo, 0 ],
[xz, yz, zhi - zlo],
[xlo, ylo, zlo ],
])
else:
xlo, xhi = float(bb_rows[0][0]), float(bb_rows[0][1])
ylo, yhi = float(bb_rows[1][0]), float(bb_rows[1][1])
zlo, zhi = float(bb_rows[2][0]), float(bb_rows[2][1])
box = np.array([
[xhi - xlo, 0, 0 ],
[0, yhi - ylo, 0 ],
[0, 0, zhi - zlo],
[xlo, ylo, zlo ],
])
atoms_header = lines[8].rstrip()
if not atoms_header.startswith("ITEM: ATOMS"):
raise ValueError(f"{source}: expected 'ITEM: ATOMS' on line 9")
col_names = atoms_header.split()[2:]
data_rows = lines[9 : 9 + n_atoms]
if len(data_rows) != n_atoms:
raise ValueError(
f"{source}: expected {n_atoms} atom rows, got {len(data_rows)}"
)
INT_COLS = {"id", "type", "ix", "iy", "iz", "mol", "proc", "procp1"}
STR_COLS = {"element", "typelabel"}
schema: Dict[str, "pl.DataType"] = {}
for name in col_names:
if name in INT_COLS:
schema[name] = pl.Int32
elif name in STR_COLS:
schema[name] = pl.Utf8
else:
schema[name] = pl.Float64
if _is_uniform_single_space(data_rows, len(col_names)):
buf = io.StringIO("".join(data_rows))
data = pl.read_csv(buf, separator=" ", schema=schema, has_header=False)
else:
cells = np.array([row.split()[: len(col_names)] for row in data_rows])
df_cols = {}
for j, name in enumerate(col_names):
col = cells[:, j]
if schema[name] == pl.Int32:
df_cols[name] = pl.Series(name, col.astype(np.int32),
dtype=pl.Int32)
elif schema[name] == pl.Utf8:
df_cols[name] = pl.Series(name, col.tolist(), dtype=pl.Utf8)
else:
df_cols[name] = pl.Series(name, col.astype(np.float64),
dtype=pl.Float64)
data = pl.DataFrame(df_cols)
# Coordinate-variant normalisation. Preference order: an explicit
# ``x y z`` triplet wins over scaled/unwrapped variants; the other
# columns are kept under their original names so the user can still
# access them. Only when ``x y z`` is missing do we promote
# ``xs ys zs``/``xsu ysu zsu``/``xu yu zu`` into ``x y z``.
cols = set(data.columns)
has_xyz = {"x", "y", "z"}.issubset(cols)
if not has_xyz:
if {"xs", "ys", "zs"}.issubset(cols) or {"xsu", "ysu", "zsu"}.issubset(cols):
tag = "xs" if {"xs", "ys", "zs"}.issubset(cols) else "xsu"
tag_y, tag_z = tag.replace("x", "y"), tag.replace("x", "z")
scaled = data.select(tag, tag_y, tag_z).to_numpy()
absolute = box[3] + scaled @ box[:3]
data = data.with_columns(
x=pl.Series(absolute[:, 0]),
y=pl.Series(absolute[:, 1]),
z=pl.Series(absolute[:, 2]),
).select(pl.all().exclude(tag, tag_y, tag_z))
elif {"xu", "yu", "zu"}.issubset(cols):
data = data.rename({"xu": "x", "yu": "y", "zu": "z"})
return data.rechunk(), Box(box, boundary), {"timestep": timestep}
def _build_xyz_properties(columns: List[str],
dtypes: List["pl.DataType"]) -> str:
"""Assemble the Properties=… token of an extended-XYZ comment line by
walking through the column list. Recognises the canonical 3-vector
aliases pos/vel/forces and folds *consecutive* {x,y,z}/{vx,vy,vz}/
{fx,fy,fz} runs into a single token. Multi-column user families with
a `<base>_0`, `<base>_1`, … `<base>_{N-1}` naming pattern are folded
into `<base>:T:N`. Everything else becomes a plain `<name>:T:1` token.
The output is independent of column order: a non-canonical layout
(e.g. element first, then a custom column, then x/y/z) still produces
a valid extended-XYZ Properties string because the folding only
triggers on contiguous runs of the canonical names.
"""
def _ptype(dt):
if dt.is_integer():
return "I"
if dt.is_float():
return "R"
if dt == pl.Utf8:
return "S"
raise ValueError(f"Unrecognised dtype {dt} in extended XYZ output")
THREE_ALIASES = [
(("x", "y", "z"), "pos", "R"),
(("xu", "yu", "zu"), "unwrapped_position", "R"),
(("vx", "vy", "vz"), "vel", "R"),
(("fx", "fy", "fz"), "forces", "R"),
]
tokens: List[str] = []
i = 0
while i < len(columns):
# Try the canonical 3-vector aliases first.
consumed = False
for triple, alias, want in THREE_ALIASES:
if columns[i : i + 3] == list(triple) and \
all(_ptype(dtypes[i + j]) == want for j in range(3)):
tokens.append(f"{alias}:{want}:3")
i += 3
consumed = True
break
if consumed:
continue
# element → species:S:1 (the conventional extended-XYZ name).
if columns[i] == "element":
tokens.append(f"species:{_ptype(dtypes[i])}:1")
i += 1
continue
# Detect a consecutive `<base>_0 _1 _2 ...` family of same dtype.
cur = columns[i]
base, sep, idx = cur.rpartition("_")
if sep and base and idx.isdigit() and int(idx) == 0:
run = 1
same = dtypes[i]
while i + run < len(columns):
nb, ns, ni = columns[i + run].rpartition("_")
if not ns or nb != base or not ni.isdigit() or int(ni) != run:
break
if dtypes[i + run] != same:
break
run += 1
if run >= 2:
tokens.append(f"{base}:{_ptype(same)}:{run}")
i += run
continue
tokens.append(f"{cur}:{_ptype(dtypes[i])}:1")
i += 1
return "Properties=" + ":".join(tokens)
def _parse_masses_to_elements(lines: List[str], n_types: int) -> Optional[Dict[int, str]]:
"""Parse a LAMMPS Masses block. Each row is `<type> <mass> [# <element>]`.
Return type→element mapping when EVERY row has a parseable element
comment; otherwise return None (the caller leaves out the `element`
column).
Used by ``read_data`` to round-trip through `write_data(..., element_list=...)`.
"""
mapping: Dict[int, str] = {}
parsed = 0
for raw in lines:
line = raw.strip()
if not line or line.startswith("#"):
continue
if "#" not in line:
continue
head, comment = line.split("#", 1)
head_toks = head.split()
if len(head_toks) < 2:
continue
try:
t = int(head_toks[0])
except ValueError:
continue
elem = comment.strip().split()[0] if comment.strip() else ""
if not elem:
continue
mapping[t] = elem
parsed += 1
if parsed >= n_types:
break
if parsed != n_types:
return None
return mapping
def _save_with_compression(
write_func, output_name: str, compress: bool = False, **kwargs
):
"""
Helper function to save file with optional compression.
Args:
write_func: Function that writes the actual file
output_name: Desired output filename
compress: Whether to compress the output
**kwargs: Arguments passed to write_func
"""
if compress:
# Place the temporary file in the destination directory rather
# than the system temp dir. This avoids:
# * cross-device rename failures inside compress_file when the
# destination volume differs from /tmp
# * Windows file-handle release races when reopening a tmp file
# in "wb" right after closing it in text mode
out_path = Path(output_name)
out_dir = out_path.parent if str(out_path.parent) else Path(".")
suffix = out_path.suffix or ".tmp"
# mkstemp returns (fd, path); close the fd immediately — we'll
# reopen via write_func in its own mode.
fd, tmp_path = tempfile.mkstemp(suffix=suffix,
prefix=".mdapy_tmp_",
dir=str(out_dir))
os.close(fd)
try:
write_func(tmp_path, **kwargs)
if not output_name.endswith(".gz"):
output_name = output_name + ".gz"
compress_file(tmp_path, output_name)
finally:
try:
os.remove(tmp_path)
except OSError:
pass
else:
# Direct write without compression
write_func(output_name, **kwargs)
[docs]
class BuildSystem:
[docs]
@classmethod
def from_file(
cls, filename: str, format: Optional[str] = None
) -> Tuple[pl.DataFrame, Box, Optional[Dict[str, Any]]]:
"""
Load system from a file (supports .gz compression).
Parameters
----------
filename : str
Path to the input file (can be .gz compressed).
format : str, optional
File format. If None, inferred from file extension.
Supported formats: 'data', 'lmp', 'dump', 'poscar', 'xyz', self difined mp.
Add '.gz' suffix for compressed files (e.g., 'data.gz', 'xyz.gz').
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box, Optional[Dict[str, Any]]]
DataFrame with atom data, simulation box, and optional global info.
"""
if format is None:
# Auto-detect format from filename
parts = os.path.basename(filename).split(".")
if parts[-1] == "gz":
# Handle .gz files
if len(parts) >= 2:
format = parts[-2]
else:
raise ValueError("Cannot infer format from filename")
else:
format = parts[-1]
format = format.lower()
# Validate format
supported_formats = ["data", "lmp", "dump", "poscar", "xyz", "mp"]
if format not in supported_formats:
raise ValueError(
f"Format '{format}' not supported. "
f"Supported formats: {supported_formats}"
)
# Route to appropriate reader
if format in ["dump", "dump.gz"]:
return cls.read_dump(filename)
elif format in ["data", "lmp"]:
return cls.read_data(filename)
elif format == "poscar":
return cls.read_poscar(filename)
elif format == "xyz":
return cls.read_xyz(filename)
elif format == "mp":
return cls.read_mp(filename)
[docs]
@staticmethod
def from_ovito(atom: "DataCollection") -> Tuple[pl.DataFrame, Box, Dict[str, Any]]:
"""
Load system from OVITO DataCollection.
Parameters
----------
atom : ovito.data.DataCollection
OVITO data collection object.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box, Dict[str, Any]]
DataFrame with atom data, simulation box, and global info.
Raises
------
ImportError
If OVITO is not installed.
"""
try:
from ovito.data import DataCollection
if not isinstance(atom, DataCollection):
raise TypeError("Only accept an Ovito DataCollection object")
except ImportError:
raise ImportError(
"You must install Ovito first. "
"See https://www.ovito.org/manual/python/introduction/installation.html"
)
boundary = [1 if i else 0 for i in atom.cell.pbc]
box = Box(np.array(atom.cell[...]).T, boundary)
global_info = {}
for i, j in atom.attributes.items():
global_info[i] = j
data = {}
for i in atom.particles.keys():
arr = np.array(atom.particles[i][...])
if i in [
"Position",
"Particle Type",
"Particle Identifier",
"Velocity",
"Velocity Magnitude",
"Force",
]:
if i == "Position":
data["x"] = arr[:, 0]
data["y"] = arr[:, 1]
data["z"] = arr[:, 2]
if i == "Particle Type":
data["type"] = arr
if i == "Particle Identifier":
data["id"] = arr
if i == "Velocity":
data["vx"] = arr[:, 0]
data["vy"] = arr[:, 1]
data["vz"] = arr[:, 2]
if i == "Velocity Magnitude":
pass
if i == "Force":
data["fx"] = arr[:, 0]
data["fy"] = arr[:, 1]
data["fz"] = arr[:, 2]
else:
name = "".join(i.split())
if arr.ndim == 1:
data[name] = arr
else:
for j in range(arr.shape[1]):
data[f"{name}_{j}"] = arr[:, j]
data = pl.DataFrame(data)
# `particle_type` is only present when the source defined per-atom
# types (LAMMPS dumps, XYZ with species, etc.). A System built from
# raw positions has no type table — guard accordingly.
pt = getattr(atom.particles, "particle_type", None)
if pt is not None and "type" in data.columns:
type2element = {t.id: t.name for t in pt.types}
# Only attach an element column if every type has a real, non-empty
# name in the table (OVITO leaves "" for unnamed types).
if type2element and all(
isinstance(name, str) and len(name) > 0
for name in type2element.values()
):
data = data.with_columns(
pl.col("type")
.replace_strict(type2element)
.rechunk()
.alias("element")
)
return data.rechunk(), box, global_info
[docs]
@staticmethod
def from_ase(atom: "Atoms") -> Tuple[pl.DataFrame, Box]:
"""
Load system from ASE Atoms.
Parameters
----------
atom : ase.Atoms
ASE Atoms object.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box]
DataFrame with atom data and simulation box.
Raises
------
ImportError
If ASE is not installed.
"""
try:
from ase import Atoms
if not isinstance(atom, Atoms):
raise TypeError("Only accept an ASE Atoms object")
except ImportError:
raise ImportError(
"You must install ASE first. See https://ase-lib.org/install.html"
)
box = np.array(atom.get_cell())
boundary = atom.get_pbc()
box = Box(box, boundary)
pos = atom.get_positions()
element = np.array(atom.get_chemical_symbols())
data = pl.from_numpy(
pos, schema={"x": pl.Float64, "y": pl.Float64, "z": pl.Float64}
).with_columns(element=element)
return data, box
[docs]
@staticmethod
def from_array(
pos: np.ndarray,
box: Union[int, float, Iterable[float], np.ndarray, Box],
) -> Tuple[pl.DataFrame, Box]:
"""
Create system from position array.
Parameters
----------
pos : np.ndarray
Atom positions (N x 3).
box : Union[int, float, Iterable[float], np.ndarray, mdapy.box.Box]
Simulation box definition.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box]
DataFrame with positions and simulation box.
"""
if not isinstance(pos, np.ndarray):
raise TypeError("pos must be numpy array")
if pos.ndim != 2 or pos.shape[1] != 3:
raise ValueError("pos must be N x 3 array")
box = Box(box)
data = pl.from_numpy(
pos, schema={"x": pl.Float64, "y": pl.Float64, "z": pl.Float64}
)
return data, box
[docs]
@staticmethod
def from_data(
data: pl.DataFrame,
box: Union[int, float, Iterable[float], np.ndarray, Box],
) -> Tuple[pl.DataFrame, Box]:
"""
Create system from DataFrame.
Parameters
----------
data : pl.DataFrame
DataFrame with at least 'x', 'y', 'z' columns.
box : Union[int, float, Iterable[float], np.ndarray, mdapy.box.Box]
Simulation box definition.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box]
Input DataFrame and simulation box.
"""
for i in ["x", "y", "z"]:
if i not in data.columns:
raise ValueError(f"Data must contain {i} column")
data = data.with_columns(
pl.col("x").cast(pl.Float64),
pl.col("y").cast(pl.Float64),
pl.col("z").cast(pl.Float64),
)
box = Box(box)
return data, box
[docs]
@staticmethod
def read_mp(filename: str) -> Tuple[pl.DataFrame, Box, Optional[Dict[str, Any]]]:
"""
Read system from mp file.
Parameters
----------
filename : str
Path to mp file, such as model.mp.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box, Optional[Dict[str, Any]]]
DataFrame with atom data, simulation box, and global info.
"""
meta_data = pl.read_parquet_metadata(filename)
data = pl.read_parquet(filename, rechunk=True)
if "box" in meta_data:
box = np.array(meta_data["box"].split(), float).reshape(3, 3)
else:
# No stored box → infer the bounding box of the atom cloud.
# Pad zero-extent axes (e.g. 2D layer) to keep the cell
# invertible — Box() rejects singular matrices.
coor = data.select("x", "y", "z")
extents = (coor.max() - coor.min()).to_numpy().flatten()
extents = np.where(extents > 0, extents, 1e-9)
box = np.diag(extents)
origin = None
boundary = None
if "origin" in meta_data:
origin = np.array(meta_data["origin"].split(), float)
if "boundary" in meta_data:
boundary = np.array(meta_data["boundary"].split(), np.int32)
box = Box(box, boundary, origin)
# Pass through user-set global_info keys; skip parquet/arrow internals.
SKIP = {"box", "origin", "boundary"}
global_info = {
k: v for k, v in meta_data.items()
if k not in SKIP and not k.startswith("ARROW")
}
return data, box, global_info
[docs]
@staticmethod
def read_xyz(filename: str) -> Tuple[pl.DataFrame, Box, Optional[Dict[str, Any]]]:
"""
Read system from XYZ file (supports .gz compression).
Parameters
----------
filename : str
Path to XYZ file (can be .xyz.gz).
Returns
-------
(pl.DataFrame, mdapy.box.Box, dict)
Per-atom data, simulation cell, and the trailing key=value
metadata from the comment line as a dict (with `lattice`,
`properties`, `pbc`, `origin` removed since they're already
represented in the box / column schema).
Notes
-----
Both classical (legacy) and extended XYZ are supported. The
extended-XYZ ``properties=`` string is parsed token by token,
supporting the conventional aliases:
* ``pos:R:3`` → ``x y z``
* ``species:S:1`` / ``element:S:1`` → ``element``
* ``vel:R:3`` / ``velo:R:3`` → ``vx vy vz``
* ``forces:R:3`` / ``force:R:3`` → ``fx fy fz``
Multi-column user-defined fields (e.g. ``custom:R:5``) become
``custom_0`` … ``custom_4``.
Performance: when the per-atom rows are uniformly single-space
separated, parsing dispatches to ``pl.read_csv`` (fast, vectorised);
otherwise it falls back to a Python ``str.split()`` pass that
tolerates tabs, runs of spaces and trailing whitespace.
"""
# ------------------------------------------------------------------
# 1. Slurp the entire file. XYZ files for a single frame are
# typically small; the savings of streaming aren't worth the
# extra branching.
# ------------------------------------------------------------------
with _open_file(filename, "r") as op:
lines = op.readlines()
if len(lines) < 2:
raise ValueError(f"{filename}: too short to be an XYZ file")
natom = int(lines[0].strip())
if natom < 0:
raise ValueError(f"{filename}: negative atom count {natom}")
if len(lines) < 2 + natom:
raise ValueError(
f"{filename}: header says {natom} atoms but only "
f"{len(lines) - 2} body lines present"
)
# Strip the trailing newline (and CRLF on Windows) before regex
# parsing, otherwise unquoted tokens at end-of-line capture the
# newline and downstream string compares break.
comment = lines[1].rstrip("\r\n")
global_info: Dict[str, Any] = {}
# Capture both quoted and unquoted key=value pairs.
for match in re.findall(
r'(\w+)=(?:"([^"]+)"|([^ ]+))', comment.replace("'", '"')
):
key = match[0].lower()
global_info[key] = match[1] if match[1] else match[2]
classical = "lattice" not in global_info
# ------------------------------------------------------------------
# 2. Build column list + per-column dtype.
# ------------------------------------------------------------------
if classical:
boundary = [0, 0, 0]
columns = ["element", "x", "y", "z"]
schema = {
"element": pl.Utf8,
"x": pl.Float64,
"y": pl.Float64,
"z": pl.Float64,
}
else:
if "properties" not in global_info:
raise ValueError(
f"{filename}: extended XYZ must contain a 'properties=' field"
)
# Boundary: pbc="T T F" / pbc="1 1 0" / etc. Default fully PBC.
boundary = [1, 1, 1]
if "pbc" in global_info:
boundary = [
1 if t in ("T", "1") else 0
for t in global_info["pbc"].split()
]
# Box: 9 reals = a/b/c stacked rows.
box_mat = np.array(global_info["lattice"].split(), float).reshape(3, 3)
origin = np.zeros(3, float)
if "origin" in global_info:
origin = np.array(global_info["origin"].split(), float)
box = np.vstack([box_mat, origin.reshape(1, 3)])
content = global_info["properties"].strip().split(":")
columns, schema = [], {}
DTYPE_MAP = {"S": pl.Utf8, "R": pl.Float64, "I": pl.Int32}
i = 0
while i + 2 < len(content):
name, ptype, n_col_str = content[i], content[i + 1], content[i + 2]
if ptype not in DTYPE_MAP:
raise ValueError(f"{filename}: unrecognised XYZ type {ptype!r}")
dtype = DTYPE_MAP[ptype]
n_col = int(n_col_str)
# Convention aliases (well-known per ASE / OVITO).
# If the canonical alias names are already taken (the
# Properties string mentions e.g. both `force:R:3` and
# `forces:R:3`), the *second* occurrence falls through
# to the generic ``<name>_<j>`` path so every column
# stays unique and the data row split lines up.
if (name == "pos" and ptype == "R" and n_col == 3
and "x" not in schema):
sub = ["x", "y", "z"]
elif (name in ("unwrapped_position", "unwrapped_pos")
and ptype == "R" and n_col == 3
and "xu" not in schema):
# GPUMD writes "unwrapped_position:R:3"; map to the
# LAMMPS-style xu/yu/zu so the rest of the toolchain
# (unwrap_trajectory, MSD, ...) sees a uniform name.
sub = ["xu", "yu", "zu"]
elif (name in ("species", "element") and ptype == "S"
and n_col == 1 and "element" not in schema):
sub = ["element"]
elif (name in ("vel", "velo") and ptype == "R" and n_col == 3
and "vx" not in schema):
sub = ["vx", "vy", "vz"]
elif (name in ("force", "forces") and ptype == "R" and n_col == 3
and "fx" not in schema):
sub = ["fx", "fy", "fz"]
else:
sub = [name] if n_col == 1 else [f"{name}_{j}" for j in range(n_col)]
for c in sub:
# Defensive uniqueness: in the rare case the
# generic name collides too, append a suffix.
base = c
suffix = 0
while c in schema:
suffix += 1
c = f"{base}__{suffix}"
columns.append(c)
schema[c] = dtype
i += 3
# ------------------------------------------------------------------
# 3. Parse the per-atom block.
# Fast path: clean single-space separator → pl.read_csv on a
# StringIO of the body. Slow path: numpy split, robust to runs
# of whitespace, tabs, CRLF.
# ------------------------------------------------------------------
body = lines[2 : 2 + natom]
if _is_uniform_single_space(body, len(columns)):
buf = io.StringIO("".join(body))
df = pl.read_csv(
buf,
separator=" ",
schema=schema,
has_header=False,
)
else:
cells = np.array([row.split()[: len(columns)] for row in body])
df_cols = {}
for j, c in enumerate(columns):
col = cells[:, j]
if schema[c] == pl.Int32:
df_cols[c] = pl.Series(c, col.astype(np.int32), dtype=pl.Int32)
elif schema[c] == pl.Utf8:
df_cols[c] = pl.Series(c, col.tolist(), dtype=pl.Utf8)
else:
df_cols[c] = pl.Series(c, col.astype(np.float64),
dtype=pl.Float64)
df = pl.DataFrame(df_cols)
# Classical XYZ: infer the bounding box from the data. If any axis
# has zero extent (e.g. 2D layer at z=const), pad it slightly so the
# cell matrix stays invertible — Box() rejects singular matrices.
if classical:
coor = df.select("x", "y", "z")
mins = coor.min().to_numpy().flatten()
extents = (coor.max() - coor.min()).to_numpy().flatten()
extents = np.where(extents > 0, extents, 1e-9)
box = np.vstack([np.diag(extents), mins])
# Strip metadata keys we've already consumed.
for key in ("pbc", "properties", "origin", "lattice"):
global_info.pop(key, None)
return df.rechunk(), Box(box, boundary), global_info
[docs]
@staticmethod
def read_poscar(
filename: str,
) -> Tuple[pl.DataFrame, Box, Optional[Dict[str, Any]]]:
"""
Read a VASP POSCAR file (supports ``.gz`` compression).
Format (line indices below are 0-based):
- line 0: free-form comment.
- line 1: universal scale factor (multiplies both the lattice
and the Cartesian atom positions).
- lines 2–4: three lattice vectors ``a``, ``b``, ``c`` (one per
row).
- line 5: element-symbol line (e.g. ``"Al Cu"``) *or* the
per-species count line (e.g. ``"1 1"``) — distinguished by
whether the first non-whitespace char is alphabetic vs.
numeric.
- line 6: per-species count line (only when line 5 is symbols).
- optional ``Selective dynamics`` header (first non-blank char
``S`` / ``s``).
- coordinate-type line: first non-blank char in
``{D, d}`` → Direct, ``{C, c, K, k}`` → Cartesian / K-point.
- ``N`` atom rows (3 floats each, plus 3 T/F flags when SD is
on).
- optional lattice-velocity / ion-velocity blocks.
Returns
-------
Tuple[pl.DataFrame, mdapy.box.Box, dict]
"""
global_info: Dict[str, Any] = {}
with _open_file(filename, "r") as op:
file = op.readlines()
if len(file) < 7:
raise ValueError(f"{filename}: too short to be a POSCAR")
# ------------------------------------------------------------------
# Lines 0–4: comment + scale + lattice vectors.
# ------------------------------------------------------------------
global_info["comment"] = file[0].strip()
scale = float(file[1].strip())
box = np.array([file[2].split(), file[3].split(), file[4].split()],
dtype=float) * scale
# ------------------------------------------------------------------
# Lines 5–6: optional element symbols + per-species counts.
# ------------------------------------------------------------------
line5 = file[5].split()
if line5 and line5[0][0].isalpha():
symbols = line5
counts = [int(c) for c in file[6].split()]
row = 7
else:
symbols = None
counts = [int(c) for c in line5]
row = 6
natoms = sum(counts)
if symbols is not None:
if len(symbols) != len(counts):
raise ValueError(
f"{filename}: {len(symbols)} symbols but {len(counts)} counts"
)
element_list = [s for s, n in zip(symbols, counts) for _ in range(n)]
type_list = []
else:
type_list = [t for t, n in enumerate(counts, start=1) for _ in range(n)]
element_list = []
# ------------------------------------------------------------------
# Optional "Selective dynamics" header (first non-blank char S/s).
# ------------------------------------------------------------------
selective_dynamics = False
if file[row].lstrip().startswith(("S", "s")):
selective_dynamics = True
row += 1
global_info["selective_dynamics"] = selective_dynamics
# ------------------------------------------------------------------
# Coordinate-type line. Use lstrip() — real-world POSCARs often
# have leading whitespace before "Cartesian"/"Direct".
# ------------------------------------------------------------------
coord_char = file[row].lstrip()[:1]
is_cartesian = coord_char in ("C", "c", "K", "k")
row += 1
# ------------------------------------------------------------------
# N atom rows.
# ------------------------------------------------------------------
atom_rows = file[row : row + natoms]
if len(atom_rows) != natoms:
raise ValueError(
f"{filename}: expected {natoms} atom rows, got {len(atom_rows)}"
)
if selective_dynamics:
pos = np.array([r.split()[:3] for r in atom_rows], dtype=float)
sd = np.array([r.split()[3:6] for r in atom_rows])
else:
pos = np.array([r.split()[:3] for r in atom_rows], dtype=float)
sd = np.empty((0, 3))
row += natoms
# Convert positions to absolute Cartesian. Per the VASP spec the
# universal scale factor multiplies BOTH the lattice and the
# Cartesian coordinates.
if is_cartesian:
pos = pos * scale
else:
pos = pos @ box
# ------------------------------------------------------------------
# Optional lattice-velocity block (6 rows after a header line).
# ------------------------------------------------------------------
if row < len(file) and file[row].lstrip().startswith(("L", "l")):
global_info["initialization_state"] = file[row + 1].strip()
global_info["lattice_velocity"] = np.array(
[line.split()[:3] for line in file[row + 2 : row + 8]],
dtype=float,
)
row += 8
# ------------------------------------------------------------------
# Optional ion-velocity block. Header is the coord-type line
# (Cartesian/Direct), then N rows.
# ------------------------------------------------------------------
vel = []
if row + 1 + natoms <= len(file):
vel = np.array(
[r.split() for r in file[row + 1 : row + 1 + natoms]],
dtype=float,
)
if vel.shape[0] == natoms:
vel_char = file[row].lstrip()[:1] if file[row].strip() else "C"
if vel_char not in ("C", "c", "K", "k"):
vel = vel @ box
# Build DataFrame
data = {}
schema = {}
if len(element_list) == natoms:
data["element"] = element_list
schema["element"] = pl.Utf8
else:
if len(type_list) != natoms:
raise ValueError("Atom count mismatch")
data["type"] = type_list
schema["type"] = pl.Int32
data["x"] = pos[:, 0]
data["y"] = pos[:, 1]
data["z"] = pos[:, 2]
for coord in ["x", "y", "z"]:
schema[coord] = pl.Float64
if len(sd) > 0:
data["sdx"] = sd[:, 0]
data["sdy"] = sd[:, 1]
data["sdz"] = sd[:, 2]
for sd_coord in ["sdx", "sdy", "sdz"]:
schema[sd_coord] = pl.Utf8
if len(vel) > 0:
data["vx"] = vel[:, 0]
data["vy"] = vel[:, 1]
data["vz"] = vel[:, 2]
for vel_coord in ["vx", "vy", "vz"]:
schema[vel_coord] = pl.Float64
return pl.DataFrame(data, schema).rechunk(), Box(box), global_info
[docs]
@staticmethod
def read_data(filename: str) -> Tuple[pl.DataFrame, Box, Optional[Dict[str, Any]]]:
"""
Read a single-frame LAMMPS data file (supports .gz compression).
Only ``atom_style atomic`` and ``atom_style charge`` are supported.
Other styles (bond, molecular, full, …) raise NotImplementedError.
The optional image-flag columns ``nx ny nz`` are preserved as
``ix iy iz`` int columns alongside the wrapped ``x y z``.
Parameters
----------
filename : str
Path to data file (can be ``.gz``).
Returns
-------
(pl.DataFrame, mdapy.box.Box, dict)
Per-atom data, simulation cell, and an empty global-info dict.
"""
# ------------------------------------------------------------------
# 1. Slurp the whole file into a list of lines. Data files are
# typically small enough that this is fine, and a single in-memory
# pass is far simpler than two-pass with offsets.
# ------------------------------------------------------------------
with _open_file(filename, "r") as op:
lines = op.readlines()
# Body section keywords we care about. Any line whose first token is
# in this set marks the start of a section. (Sections we don't read
# we still need to know about so we can skip them.)
BODY_SECTIONS = {
"Atoms", "Velocities", "Masses",
# Forbidden / unsupported sections — presence means the file
# uses a richer atom_style than atomic/charge.
"Bonds", "Angles", "Dihedrals", "Impropers",
"Bond", "Angle", "Dihedral", "Improper", # *Coeffs prefixes
"Pair", "PairIJ",
"Ellipsoids", "Lines", "Triangles", "Bodies",
"Atom", "Bond", # Type Labels
}
FORBIDDEN_HINTS = ("bond", "angle", "dihedral", "improper",
"molecular", "ellipsoid")
# ------------------------------------------------------------------
# 2. Find the first section header (= end of the header block).
# Scan looking for a line whose stripped first token starts with
# a capital and is in BODY_SECTIONS.
# ------------------------------------------------------------------
section_starts = {} # name -> line index of the header
for i, raw in enumerate(lines):
stripped = raw.strip()
if not stripped or stripped.startswith("#"):
continue
first_tok = stripped.split()[0]
if first_tok in BODY_SECTIONS:
section_starts.setdefault(first_tok, i)
if "Atoms" not in section_starts:
raise ValueError(f"{filename}: no 'Atoms' section found")
header_lines = lines[: section_starts["Atoms"]]
# ------------------------------------------------------------------
# 3. Parse the header for N, n_types, box, atom_style hints.
# ------------------------------------------------------------------
N = 0
n_types = 0
xlo = xhi = ylo = yhi = zlo = zhi = 0.0
xy = xz = yz = 0.0
for raw in header_lines:
tokens = raw.split()
if not tokens:
continue
tail = tokens[-1]
try:
if tail == "atoms":
N = int(tokens[0])
elif tail == "types" and len(tokens) >= 2 and tokens[1] == "atom":
n_types = int(tokens[0])
elif tail == "xhi":
xlo, xhi = float(tokens[0]), float(tokens[1])
elif tail == "yhi":
ylo, yhi = float(tokens[0]), float(tokens[1])
elif tail == "zhi":
zlo, zhi = float(tokens[0]), float(tokens[1])
elif tail == "yz":
xy, xz, yz = (float(tokens[0]), float(tokens[1]),
float(tokens[2]))
except (IndexError, ValueError):
# Header lines with the wrong shape are skipped silently —
# `Atoms` is the real anchor.
pass
# Guard against unsupported atom_style hints (header keywords
# like '5 bond types' / '... bonds' / '... angles' / etc.).
if len(tokens) >= 2 and any(h in tokens[1].lower() for h in FORBIDDEN_HINTS):
raise NotImplementedError(
f"{filename}: only atom_style atomic / charge are supported "
f"(found header keyword '{tokens[1]}')"
)
box = np.array([
[xhi - xlo, 0, 0 ],
[xy, yhi - ylo, 0 ],
[xz, yz, zhi - zlo],
[xlo, ylo, zlo ],
])
# ------------------------------------------------------------------
# 4. Locate the data block under `Atoms`. The header line may carry
# a trailing comment ("Atoms # atomic" / "Atoms" / "Atoms # foo").
# Detect atom_style robustly:
#
# * explicit "# atomic" / "# charge" → use directly
# * unrecognised comment → infer from column count
# * no comment → infer from column count
#
# Unsupported styles named explicitly (bond/full/molecular/…)
# raise NotImplementedError.
# ------------------------------------------------------------------
atoms_header_idx = section_starts["Atoms"]
atoms_header = lines[atoms_header_idx]
atom_style_hint = None
if "#" in atoms_header:
atom_style_hint = atoms_header.split("#", 1)[1].strip().split()[0].lower()
if atom_style_hint and atom_style_hint not in ("atomic", "charge"):
raise NotImplementedError(
f"{filename}: atom_style '{atom_style_hint}' not supported "
f"(only atomic / charge)"
)
# First data line: skip blanks/comments after the `Atoms` header.
first_data_idx = atoms_header_idx + 1
while first_data_idx < len(lines):
stripped = lines[first_data_idx].strip()
if stripped and not stripped.startswith("#"):
break
first_data_idx += 1
if first_data_idx >= len(lines):
raise ValueError(f"{filename}: 'Atoms' section is empty")
first_cols = lines[first_data_idx].split()
n_cols = len(first_cols)
# Match column count to atom style. Image flags `nx ny nz` add 3.
# Allowed shapes:
# atomic: 5 (id type x y z) or 8 (+ ix iy iz)
# charge: 6 (id type q x y z) or 9 (+ ix iy iz)
if atom_style_hint == "atomic":
valid = (5, 8)
elif atom_style_hint == "charge":
valid = (6, 9)
else:
# Infer from column count.
valid = None
if valid is None:
if n_cols in (5, 8):
style = "atomic"
elif n_cols in (6, 9):
style = "charge"
else:
raise NotImplementedError(
f"{filename}: cannot infer atom_style from {n_cols}-column "
f"Atoms data (only atomic / charge are supported)"
)
else:
if n_cols not in valid:
raise ValueError(
f"{filename}: atom_style {atom_style_hint!r} expects "
f"{valid} columns, got {n_cols}"
)
style = atom_style_hint
if style == "atomic":
col_names = ["id", "type", "x", "y", "z"]
else: # charge
col_names = ["id", "type", "q", "x", "y", "z"]
has_image_flags = (n_cols == len(col_names) + 3)
if has_image_flags:
col_names += ["ix", "iy", "iz"]
# ------------------------------------------------------------------
# 5. Parse the per-atom block.
#
# Fast path: when every row is a clean single-space-separated
# record with the expected column count, use polars' CSV reader
# (fast, vectorised). Otherwise fall back to a numpy split-based
# parser (robust to runs of whitespace, mixed tabs, comments).
# ------------------------------------------------------------------
data_block = lines[first_data_idx : first_data_idx + N]
if len(data_block) != N:
raise ValueError(
f"{filename}: expected {N} atom rows, found {len(data_block)}"
)
# Per-column dtype for atomic / charge.
schema: Dict[str, "pl.DataType"] = {}
for c in col_names:
if c in ("id", "type", "ix", "iy", "iz"):
schema[c] = pl.Int32
else:
schema[c] = pl.Float64
if _is_uniform_single_space(data_block, len(col_names)):
# Fast path — feed just the data block to polars so its CSV
# reader can't accidentally scan past it (n_rows alone doesn't
# cap the file's-end-of-block detection on every Polars version).
buf = io.StringIO("".join(data_block))
data = pl.read_csv(
buf,
separator=" ",
schema=schema,
has_header=False,
)
else:
# Slow path — Python split is robust to multi-space / tab.
atoms_arr = np.array([row.split()[: len(col_names)] for row in data_block])
cols = {}
for i, c in enumerate(col_names):
col = atoms_arr[:, i]
if schema[c] == pl.Int32:
cols[c] = pl.Series(c, col.astype(np.int32), dtype=pl.Int32)
else:
cols[c] = pl.Series(c, col.astype(np.float64), dtype=pl.Float64)
data = pl.DataFrame(cols)
# ------------------------------------------------------------------
# 6. If a `Velocities` section exists, parse it and join.
# ------------------------------------------------------------------
if "Velocities" in section_starts:
vh = section_starts["Velocities"]
first_vel_idx = vh + 1
while first_vel_idx < len(lines):
stripped = lines[first_vel_idx].strip()
if stripped and not stripped.startswith("#"):
break
first_vel_idx += 1
vel_block = lines[first_vel_idx : first_vel_idx + N]
if len(vel_block) == N:
# Velocity rows have 4 cols: atom-id vx vy vz.
if _is_uniform_single_space(vel_block, 4):
buf = io.StringIO("".join(vel_block))
vdf = pl.read_csv(
buf,
separator=" ",
schema={"id": pl.Int32, "vx": pl.Float64,
"vy": pl.Float64, "vz": pl.Float64},
has_header=False,
)
vel_ids = vdf["id"].to_numpy()
vel_arr = vdf.select("vx", "vy", "vz").to_numpy()
else:
vel_arr = np.array([row.split()[1:4] for row in vel_block],
dtype=np.float64)
vel_ids = np.array([int(row.split()[0]) for row in vel_block])
id_to_vel = dict(zip(vel_ids, vel_arr))
ordered = np.array([id_to_vel[a] for a in data["id"].to_list()],
dtype=np.float64)
data = data.with_columns(
vx=ordered[:, 0], vy=ordered[:, 1], vz=ordered[:, 2],
)
# ------------------------------------------------------------------
# 7. If the Masses section carries `# Element` comments for every
# atom type, attach an `element` column. This round-trips with
# `write_data(..., element_list=...)`.
# ------------------------------------------------------------------
if "Masses" in section_starts and n_types > 0:
mh = section_starts["Masses"]
first_mass_idx = mh + 1
while first_mass_idx < len(lines):
stripped = lines[first_mass_idx].strip()
if stripped and not stripped.startswith("#"):
break
first_mass_idx += 1
mass_block = lines[first_mass_idx : first_mass_idx + n_types]
type2elem = _parse_masses_to_elements(mass_block, n_types)
if type2elem is not None:
# Use polars' replace_strict for vectorised mapping.
data = data.with_columns(
pl.col("type").replace_strict(type2elem).alias("element")
)
return data.rechunk(), Box(box, [1, 1, 1]), {}
[docs]
@staticmethod
def parse_dump_frame(lines: List[str],
source: str = "<dump>"
) -> Tuple[pl.DataFrame, Box, Dict[str, Any]]:
"""
Parse one frame's worth of LAMMPS dump lines (the 9 header lines
+ N atom rows). Used by both :py:meth:`read_dump` (single-frame)
and :class:`mdapy.Trajectory` (multi-frame).
"""
return _parse_dump_frame_impl(lines, source)
[docs]
@staticmethod
def read_dump(filename: str) -> Tuple[pl.DataFrame, Box, Dict[str, Any]]:
"""
Read a single-frame LAMMPS dump file (text style, supports .gz).
Multi-frame dumps are rejected with a clear error — use
:class:`mdapy.Trajectory` for those.
Coordinate column variants understood (see LAMMPS dump custom docs):
* ``x y z`` wrapped Cartesian → kept as ``x y z``
* ``xs ys zs`` scaled (fractional) → unscaled to Cartesian
* ``xu yu zu`` unwrapped Cartesian → kept as ``x y z``
* ``xsu ysu zsu`` scaled-unwrapped → unscaled to Cartesian
Image flags ``ix iy iz`` and the optional ``element`` column from
``dump_modify ... element ...`` are preserved verbatim.
BOX BOUNDS header forms recognised:
* ``ITEM: BOX BOUNDS xx yy zz`` — orthogonal
* ``ITEM: BOX BOUNDS xy xz yz xx yy zz`` — restricted triclinic
* ``ITEM: BOX BOUNDS abc origin`` — general triclinic (newer LAMMPS)
"""
with _open_file(filename, "r") as op:
lines = op.readlines()
# Locate every "ITEM: TIMESTEP" — there must be exactly one.
ts_indices = [i for i, l in enumerate(lines)
if l.strip().startswith("ITEM: TIMESTEP")]
if not ts_indices:
raise ValueError(f"{filename}: no 'ITEM: TIMESTEP' header found")
if len(ts_indices) > 1:
raise ValueError(
f"{filename}: multi-frame dump file (found {len(ts_indices)} "
f"frames). Use mdapy.Trajectory or split the file first."
)
return _parse_dump_frame_impl(lines, source=filename)
[docs]
class SaveSystem:
[docs]
@staticmethod
def to_ase(data: pl.DataFrame, box: Box) -> "Atoms":
"""
Convert system to ASE Atoms object.
Parameters
----------
data : pl.DataFrame
Particle data with at least 'x', 'y', 'z', 'element' columns.
box : mdapy.box.Box
Simulation box.
Returns
-------
ase.Atoms
ASE Atoms object.
Raises
------
ImportError
If ASE is not installed.
ValueError
If required columns are missing.
"""
try:
from ase import Atoms
except ImportError:
raise ImportError(
"You must install ASE first. "
"See https://wiki.fysik.dtu.dk/ase/install.html"
)
# Check required columns
for col in ["x", "y", "z", "element"]:
if col not in data.columns:
raise ValueError(f"data must contain {col} column")
# Get positions
positions = data.select("x", "y", "z").to_numpy()
symbols = data["element"].to_list()
# Create ASE Atoms object
atoms = Atoms(
symbols=symbols,
positions=positions,
cell=box.box,
pbc=[bool(p) for p in box.boundary],
)
# Add velocities if available
if all(col in data.columns for col in ["vx", "vy", "vz"]):
velocities = data.select("vx", "vy", "vz").to_numpy()
atoms.set_velocities(velocities)
return atoms
[docs]
@staticmethod
def to_ovito(
data: pl.DataFrame, box: Box, global_info: Optional[Dict[str, Any]] = None
) -> "DataCollection":
"""
Save system to OVITO DataCollection.
Parameters
----------
data : pl.DataFrame
Particle informations.
box : Box
Simulation cell.
global_info : Optional[Dict[str, Any]], optional
Global attributes to add.
Returns
-------
ovito.data.DataCollection
OVITO data collection object.
Raises
------
ImportError
If OVITO is not installed.
"""
try:
from ovito.data import DataCollection
except ImportError:
raise ImportError(
"You must install Ovito first. "
"See https://www.ovito.org/manual/python/introduction/installation.html"
)
for i in ["x", "y", "z"]:
if i not in data.columns:
raise ValueError(f"data must contain {i} column")
data_collection = DataCollection()
cell = data_collection.create_cell(
matrix=box.box.T, pbc=[bool(p) for p in box.boundary]
)
cell[:, 3] = box.origin
particles = data_collection.create_particles(count=data.shape[0])
particles.create_property(
"Position", data=data.select("x", "y", "z").to_numpy()
)
if "element" in data.columns:
types = particles.create_property("Particle Type")
symbols = data["element"]
with types as tarray:
for i, sym in enumerate(symbols):
tarray[i] = types.add_type_name(sym, particles).id
elif "type" in data.columns:
particles.create_property("Particle Type", data=data["type"].to_numpy())
else:
particles.create_property(
"Particle Type", data=np.ones(data.shape[0], np.int32)
)
# Create user-defined properties
if all(col in data.columns for col in ["vx", "vy", "vz"]):
particles.create_property(
"Velocity", data=data.select("vx", "vy", "vz").to_numpy()
)
if all(col in data.columns for col in ["fx", "fy", "fz"]):
particles.create_property(
"Force", data=data.select("fx", "fy", "fz").to_numpy()
)
for name in data.columns:
if name in [
"x",
"y",
"z",
"element",
"type",
"vx",
"vy",
"vz",
"fx",
"fy",
"fz",
]:
continue
try:
particles.create_property(name, data=data[name])
except Exception:
pass
if global_info:
for key, value in global_info.items():
try:
data_collection.attributes[key] = value
except Exception:
pass
return data_collection
[docs]
def write_mp(
output_name: str, data: pl.DataFrame, box: Box, global_info: Dict[str, str]
) -> None:
"""
Write system to mp file.
Parameters
----------
output_name : str
Output file path.
data : pl.DataFrame
Atom data with 'x', 'y', 'z', 'element'.
box : Box
Box information.
global_info : Dict
Global information.
"""
metadata = {}
metadata["box"] = " ".join(box.box.astype(str).flatten().tolist())
metadata["origin"] = " ".join(box.origin.astype(str).tolist())
metadata["boundary"] = " ".join(box.boundary.astype(str).tolist())
for i in global_info.keys():
if i not in ["box", "origin", "boundary"] and i in [
"energy",
"stress",
"virial",
"timestep",
]:
metadata[str(i)] = str(global_info[i])
data.write_parquet(output_name, metadata=metadata)
[docs]
@staticmethod
def write_xyz(
output_name: str,
box: Box,
data: pl.DataFrame,
classical: bool = False,
compress: bool = False,
**kargs,
):
"""
Write system to XYZ file (with optional compression).
Parameters
----------
output_name : str
Output file path.
box : mdapy.box.Box
Simulation box.
data : pl.DataFrame
Atom data with 'x', 'y', 'z', 'element'.
classical : bool, optional
Use classical XYZ format.
compress : bool, optional
Compress output to .gz format.
**kargs
Additional key-value pairs for extended XYZ.
"""
for col in ["x", "y", "z", "element"]:
if col not in data.columns:
raise ValueError(f"data must contain {col} column")
def _write_xyz_internal(filename, box, data, classical, **kargs):
if classical:
with open(filename, "wb") as op:
op.write(f"{data.shape[0]}\n".encode())
op.write("Classical XYZ file written by mdapy.\n".encode())
data.select("element", "x", "y", "z").write_csv(
op, separator=" ", include_header=False,
float_precision=10,
)
else:
# Token-by-token Properties string assembly. Independent of
# column order — folding only triggers on contiguous runs
# of canonical names (x/y/z, vx/vy/vz, fx/fy/fz, base_0/1/…).
properties_str = _build_xyz_properties(data.columns,
list(data.dtypes))
# Build the comment line with Lattice / Properties /
# pbc / Origin (the four well-known extended-XYZ keys).
lattice_str = (
'Lattice="' + " ".join(box.box.flatten().astype(str).tolist())
+ '"'
)
pbc_str = (
'pbc="'
+ " ".join("T" if i == 1 else "F" for i in box.boundary)
+ '"'
)
origin_str = (
'Origin="' + " ".join(box.origin.astype(str).tolist()) + '"'
)
comments = f"{lattice_str} {properties_str} {pbc_str} {origin_str}"
for key, value in kargs.items():
if key.lower() not in ("lattice", "pbc", "properties", "origin"):
try:
if key.lower() in ("virial", "bec", "stress"):
comments += f' {key}="{value}"'
else:
comments += f" {key}={value}"
except Exception:
pass
with open(filename, "wb") as op:
op.write(f"{data.shape[0]}\n".encode())
op.write(f"{comments}\n".encode())
data.write_csv(op, separator=" ", include_header=False,
float_precision=10)
_save_with_compression(
_write_xyz_internal,
output_name,
compress,
box=box,
data=data,
classical=classical,
**kargs,
)
[docs]
@staticmethod
def write_poscar(
output_name: str,
box: Box,
data: pl.DataFrame,
reduced_pos: bool = False,
selective_dynamics: bool = False,
compress: bool = False,
):
"""
Write system to POSCAR file (with optional compression).
Parameters
----------
output_name : str
Output file path.
box : mdapy.box.Box
Simulation box.
data : pl.DataFrame
Atom data with 'element', 'x', 'y', 'z'.
reduced_pos : bool, optional
Use reduced coordinates.
selective_dynamics : bool, optional
Include selective dynamics.
compress : bool, optional
Compress output to .gz format.
"""
for col in ["element", "x", "y", "z"]:
if col not in data.columns:
raise ValueError(f"data must contain {col} column")
def _write_poscar_internal(
filename, box: Box, data: pl.DataFrame, reduced_pos, selective_dynamics
):
data = data.sort("element").with_columns(
pl.col("x") - box.origin[0],
pl.col("y") - box.origin[1],
pl.col("z") - box.origin[2],
)
element_list = data["element"].unique().sort()
element_number = [
data.filter(pl.col("element") == i).shape[0] for i in element_list
]
if reduced_pos:
new_pos = np.dot(data.select("x", "y", "z").to_numpy(), box.inverse_box)
data = data.with_columns(
pl.lit(new_pos[:, 0]).alias("x"),
pl.lit(new_pos[:, 1]).alias("y"),
pl.lit(new_pos[:, 2]).alias("z"),
)
if selective_dynamics:
for col in ["sdx", "sdy", "sdz"]:
if col not in data.columns:
raise ValueError(
f"data must contain {col} if selective_dynamics=True"
)
with open(filename, "wb") as op:
op.write("# VASP POSCAR file written by mdapy.\n".encode())
op.write("1.0000000000\n".encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*box.box[0]).encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*box.box[1]).encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*box.box[2]).encode())
for aname in element_list:
op.write(f"{aname} ".encode())
op.write("\n".encode())
for atype in element_number:
op.write(f"{atype} ".encode())
op.write("\n".encode())
if selective_dynamics:
op.write("Selective dynamics\n".encode())
if reduced_pos:
op.write("Direct\n".encode())
else:
op.write("Cartesian\n".encode())
if selective_dynamics:
data.select(["x", "y", "z", "sdx", "sdy", "sdz"]).write_csv(
op, separator=" ", include_header=False, float_precision=10
)
else:
data.select(["x", "y", "z"]).write_csv(
op, separator=" ", include_header=False, float_precision=10
)
_save_with_compression(
_write_poscar_internal,
output_name,
compress,
box=box,
data=data,
reduced_pos=reduced_pos,
selective_dynamics=selective_dynamics,
)
[docs]
@staticmethod
def write_data(
output_name: str,
box: Box,
data: pl.DataFrame,
element_list: Optional[List] = None,
num_type: Optional[int] = None,
data_format: str = "atomic",
compress: bool = False,
):
"""
Write system to LAMMPS data file (with optional compression).
Parameters
----------
output_name : str
Output file path.
box : mdapy.box.Box
Simulation box.
data : pl.DataFrame
Atom data with 'type', 'x', 'y', 'z'.
element_list : Optional[List], optional
Element names for atom types.
num_type : Optional[int], optional
Number of atom types.
data_format : str, optional
'atomic' or 'charge'.
compress : bool, optional
Compress output to .gz format.
"""
if not isinstance(data, pl.DataFrame):
raise TypeError("data must be polars DataFrame")
for col in ["type", "x", "y", "z"]:
if col not in data.columns:
raise ValueError(f"data must contain {col} column")
def _write_data_internal(
filename, box, data, element_list, num_type, data_format
):
if "id" not in data.columns:
data = data.with_row_index("id", offset=1)
need_rotation = False
if box.box[0, 1] != 0 or box.box[0, 2] != 0 or box.box[1, 2] != 0:
need_rotation = True
if need_rotation:
pos = data.select("x", "y", "z").to_numpy(writable=True)
box, rotate = box.align_to_lammps_box()
pos -= box.origin
box.origin[:] = 0
pos = pos @ rotate
data = data.with_columns(
pl.lit(pos[:, 0]).alias("x"),
pl.lit(pos[:, 1]).alias("y"),
pl.lit(pos[:, 2]).alias("z"),
)
if data_format not in ["atomic", "charge"]:
raise ValueError(
f"Unrecognized data format: {data_format}. "
"Only support atomic and charge"
)
type_max = int(data["type"].max())
if num_type is None:
num_type = type_max
else:
if num_type < type_max:
raise ValueError(
f"num_type ({num_type}) is less than the largest type "
f"in data ({type_max})"
)
if element_list is not None:
if len(element_list) < num_type:
raise ValueError(
f"element_list has {len(element_list)} entries but "
f"num_type is {num_type}; need at least {num_type}"
)
# Honour the user's explicit num_type; just warn when the
# element_list is longer (extra entries are unused but
# listed in Masses for forward compatibility).
if len(element_list) > num_type:
import warnings
warnings.warn(
f"element_list has {len(element_list)} entries but "
f"num_type={num_type}; writing only the first "
f"{num_type} elements.",
stacklevel=2,
)
element_list = list(element_list)[:num_type]
for i in element_list:
if i not in atomic_numbers:
raise ValueError(f"Unrecognized element name {i}")
with open(filename, "wb") as op:
op.write("# LAMMPS data file written by mdapy.\n\n".encode())
op.write(f"{data.shape[0]} atoms\n{num_type} atom types\n\n".encode())
op.write(
f"{box.origin[0]} {box.origin[0] + box.box[0, 0]} xlo xhi\n".encode()
)
op.write(
f"{box.origin[1]} {box.origin[1] + box.box[1, 1]} ylo yhi\n".encode()
)
op.write(
f"{box.origin[2]} {box.origin[2] + box.box[2, 2]} zlo zhi\n".encode()
)
xy, xz, yz = box.box[1, 0], box.box[2, 0], box.box[2, 1]
if xy != 0 or xz != 0 or yz != 0:
op.write(f"{xy} {xz} {yz} xy xz yz\n".encode())
op.write("\n".encode())
if element_list is not None:
op.write("Masses\n\n".encode())
for i, j in enumerate(element_list, start=1):
mass = atomic_masses[atomic_numbers[j]]
op.write(f"{i} {mass} # {j}\n".encode())
op.write("\n".encode())
op.write(rf"Atoms # {data_format}".encode())
op.write("\n\n".encode())
if data_format == "atomic":
table = data.select(["id", "type", "x", "y", "z"])
elif data_format == "charge":
if "q" not in data.columns:
table = data.with_columns(pl.lit(0.0).alias("q")).select(
["id", "type", "q", "x", "y", "z"]
)
else:
table = data.select(["id", "type", "q", "x", "y", "z"])
# float_precision=10 keeps positions / velocities round-
# trippable to ~9 significant digits without writing huge
# default-precision strings.
table.write_csv(op, separator=" ", include_header=False,
float_precision=10)
if all(col in data.columns for col in ["vx", "vy", "vz"]):
op.write("\nVelocities\n\n".encode())
table = data.select(["id", "vx", "vy", "vz"])
table.write_csv(op, separator=" ", include_header=False,
float_precision=10)
_save_with_compression(
_write_data_internal,
output_name,
compress,
box=box,
data=data,
element_list=element_list,
num_type=num_type,
data_format=data_format,
)
[docs]
@staticmethod
def write_dump(
output_name: str,
box: Box,
data: pl.DataFrame,
timestep: float = 0.0,
compress: bool = False,
):
"""
Write system to LAMMPS dump file (with optional compression).
Parameters
----------
output_name : str
Output file path.
box : mdapy.box.Box
Simulation box.
data : pl.DataFrame
Atom data with 'type', 'x', 'y', 'z'.
timestep : float, optional
Timestep value.
compress : bool, optional
Compress output to .gz format.
"""
if not isinstance(data, pl.DataFrame):
raise TypeError("data must be polars DataFrame")
for col in ["type", "x", "y", "z"]:
if col not in data.columns:
raise ValueError(f"data must contain {col} column")
def _write_dump_internal(filename, box, data, timestep):
with open(filename, "wb") as op:
SaveSystem.write_dump_frame_to(op, box, data, timestep)
_save_with_compression(
_write_dump_internal,
output_name,
compress,
box=box,
data=data,
timestep=timestep,
)
[docs]
@staticmethod
def write_dump_frame_to(op, box: Box, data: pl.DataFrame,
timestep: float = 0.0) -> None:
"""Write one LAMMPS dump frame to an already-open binary file
handle ``op``. Used by both :py:meth:`write_dump` (single-frame)
and :class:`mdapy.Trajectory` (multi-frame).
"""
if "id" not in data.columns:
data = data.with_row_index("id", offset=1)
# Triclinic dumps require lammps-aligned axes; rotate if needed.
if box.box[0, 1] != 0 or box.box[0, 2] != 0 or box.box[1, 2] != 0:
pos = data.select("x", "y", "z").to_numpy(writable=True)
box, rotate = box.align_to_lammps_box()
pos -= box.origin
box.origin[:] = 0
pos = pos @ rotate
data = data.with_columns(
pl.lit(pos[:, 0]).alias("x"),
pl.lit(pos[:, 1]).alias("y"),
pl.lit(pos[:, 2]).alias("z"),
)
# Keep numeric columns + the optional `element` string column
# (newer LAMMPS dumps support `element` via `dump_modify ...
# element X Y Z`). Other string columns are dropped.
keep = [c for c, dt in zip(data.columns, data.dtypes)
if dt.is_numeric() or c == "element"]
data = data.select(keep)
boundary2str = ["pp" if i == 1 else "ss" for i in box.boundary]
op.write(f"ITEM: TIMESTEP\n{timestep}\n".encode())
op.write(f"ITEM: NUMBER OF ATOMS\n{data.shape[0]}\n".encode())
xlo, ylo, zlo = box.origin
xhi = xlo + box.box[0, 0]
yhi = ylo + box.box[1, 1]
zhi = zlo + box.box[2, 2]
xy, xz, yz = box.box[1, 0], box.box[2, 0], box.box[2, 1]
if xy != 0 or xz != 0 or yz != 0:
xlo_bound = xlo + min(0.0, xy, xz, xy + xz)
xhi_bound = xhi + max(0.0, xy, xz, xy + xz)
ylo_bound = ylo + min(0.0, yz)
yhi_bound = yhi + max(0.0, yz)
op.write(
f"ITEM: BOX BOUNDS xy xz yz {boundary2str[0]} "
f"{boundary2str[1]} {boundary2str[2]}\n".encode()
)
op.write(f"{xlo_bound} {xhi_bound} {xy}\n".encode())
op.write(f"{ylo_bound} {yhi_bound} {xz}\n".encode())
op.write(f"{zlo} {zhi} {yz}\n".encode())
else:
op.write(
f"ITEM: BOX BOUNDS {boundary2str[0]} "
f"{boundary2str[1]} {boundary2str[2]}\n".encode()
)
op.write(f"{xlo} {xhi}\n".encode())
op.write(f"{ylo} {yhi}\n".encode())
op.write(f"{zlo} {zhi}\n".encode())
op.write(("ITEM: ATOMS " + " ".join(data.columns) + " \n").encode())
data.write_csv(op, separator=" ", include_header=False,
float_precision=10)
if __name__ == "__main__":
pass