# Copyright (c) 2022, mushroomfire in Beijing Institute of Technology
# This file is from the mdapy project, released under the BSD 3-Clause License.
import gzip
import numpy as np
import polars as pl
import os
import shutil
from tempfile import mkdtemp
try:
from .pigz import compress_file
except Exception:
from pigz import compress_file
[docs]
class SaveFile:
[docs]
@staticmethod
def write_xyz(output_name, box, data, boundary, classical=False):
assert isinstance(output_name, str)
assert isinstance(box, np.ndarray)
assert box.shape == (3, 2) or box.shape == (4, 3)
if box.shape == (3, 2):
new_box = np.zeros((4, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
new_box[-1] = box[:, 0]
else:
assert box[0, 1] == 0
assert box[0, 2] == 0
assert box[1, 2] == 0
new_box = box
assert isinstance(data, pl.DataFrame)
for col in ["x", "y", "z"]:
assert col in data.columns, f"data must contain {col}."
assert (
"type" in data.columns or "type_name" in data.columns
), f"data must contain type or type_name."
if classical:
with open(output_name, "wb") as op:
op.write(f"{data.shape[0]}\n".encode())
op.write("Classical XYZ file written by mdapy.\n".encode())
if "type_name" in data.columns:
data.select("type_name", "x", "y", "z").write_csv(
op, separator=" ", include_header=False
)
else:
data.select("type", "x", "y", "z").write_csv(
op, separator=" ", include_header=False
)
else:
properties = []
for name, dtype in zip(data.columns, data.dtypes):
if dtype in pl.INTEGER_DTYPES:
ptype = "I"
elif dtype in pl.FLOAT_DTYPES:
ptype = "R"
elif dtype == pl.Utf8:
ptype = "S"
else:
raise f"Unrecognized data type {dtype}."
if name == "type_name":
properties.append(f"species:{ptype}:1")
else:
properties.append(f"{name}:{ptype}:1")
properties_str = "Properties=" + ":".join(properties).replace(
"x:R:1:y:R:1:z:R:1", "pos:R:3"
)
if "vx:R:1:vy:R:1:vz:R:1" in properties_str:
properties_str = properties_str.replace(
"vx:R:1:vy:R:1:vz:R:1", "velo:R:3"
)
if "fx:R:1:fy:R:1:fz:R:1" in properties_str:
properties_str = properties_str.replace(
"fx:R:1:fy:R:1:fz:R:1", "force:R:3"
)
lattice_str = (
"Lattice="
+ '"'
+ " ".join(new_box[:-1].flatten().astype(str).tolist())
+ '"'
)
pbc_str = (
"pbc="
+ '"'
+ " ".join(["T" if i == 1 else "F" for i in boundary])
+ '"'
)
origin_str = 'Origin="' + " ".join(new_box[-1].astype(str).tolist()) + '"'
comments = (
lattice_str + " " + properties_str + " " + pbc_str + " " + origin_str
)
with open(output_name, "wb") as op:
op.write(f"{data.shape[0]}\n".encode())
if "type_name" in data.columns and "type" in data.columns:
if "type:I:1:" in comments:
comments = comments.replace("type:I:1:", "")
if ":type:I:1" in comments:
comments = comments.replace(":type:I:1", "")
op.write(f"{comments}\n".encode())
data.select(pl.all().exclude("type")).write_csv(
op, separator=" ", include_header=False
)
else:
op.write(f"{comments}\n".encode())
data.write_csv(op, separator=" ", include_header=False)
[docs]
@staticmethod
def write_POSCAR(
output_name,
box,
data,
type_name=None,
reduced_pos=False,
selective_dynamics=False,
save_velocity=False,
):
assert isinstance(output_name, str)
assert isinstance(box, np.ndarray)
assert box.shape == (3, 2) or box.shape == (4, 3)
if box.shape == (3, 2):
new_box = np.zeros((3, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
else:
assert box[0, 1] == 0
assert box[0, 2] == 0
assert box[1, 2] == 0
new_box = box[:-1]
assert isinstance(data, pl.DataFrame)
for col in ["type", "x", "y", "z"]:
assert col in data.columns, f"data must contain {col}."
data = (
data.select(pl.all())
.sort("type")
.with_columns(
pl.col("x") - pl.col("x").min(),
pl.col("y") - pl.col("y").min(),
pl.col("z") - pl.col("z").min(),
)
)
if reduced_pos:
new_pos = np.c_[data["x"], data["y"], data["z"]] @ np.linalg.pinv(new_box.T)
data = data.with_columns(
pl.lit(new_pos[:, 0]).alias("x"),
pl.lit(new_pos[:, 1]).alias("y"),
pl.lit(new_pos[:, 2]).alias("z"),
)
type_list = data.group_by("type", maintain_order=True).count()["count"]
if type_name is not None:
assert len(type_name) == type_list.shape[0]
else:
if "type_name" in data.columns:
type_name = data["type_name"].unique(maintain_order=True)
assert len(type_name) == type_list.shape[0]
if selective_dynamics:
for col in ["sdx", "sdy", "sdz"]:
assert (
col in data.columns
), f"data mush contain {col} if you set selective_dynamics to True."
if save_velocity:
for col in ["vx", "vy", "vz"]:
assert (
col in data.columns
), f"data mush contain {col} if you set save_velocity to True."
with open(output_name, "wb") as op:
op.write("# VASP POSCAR file written by mdapy.\n".encode())
op.write("1.0000000000\n".encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*new_box[0]).encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*new_box[1]).encode())
op.write("{:.10f} {:.10f} {:.10f}\n".format(*new_box[2]).encode())
if type_name is not None:
for aname in type_name:
op.write(f"{aname} ".encode())
op.write("\n".encode())
for atype in type_list:
op.write(f"{atype} ".encode())
op.write("\n".encode())
if selective_dynamics:
op.write("Selective dynamics\n".encode())
if reduced_pos:
op.write("Direct\n".encode())
else:
op.write("Cartesian\n".encode())
if selective_dynamics:
data.select(["x", "y", "z", "sdx", "sdy", "sdz"]).write_csv(
op, separator=" ", include_header=False, float_precision=10
)
else:
data.select(["x", "y", "z"]).write_csv(
op, separator=" ", include_header=False, float_precision=10
)
if save_velocity:
op.write("Cartesian\n".encode())
data.select(["vx", "vy", "vz"]).write_csv(
op, separator=" ", include_header=False, float_precision=10
)
[docs]
@staticmethod
def write_data(
output_name,
box,
data=None,
pos=None,
type_list=None,
num_type=None,
data_format="atomic",
):
assert isinstance(output_name, str)
assert isinstance(box, np.ndarray)
assert box.shape == (3, 2) or box.shape == (4, 3)
if box.shape == (3, 2):
new_box = np.zeros((4, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
new_box[-1] = box[:, 0]
else:
assert box[0, 1] == 0
assert box[0, 2] == 0
assert box[1, 2] == 0
new_box = box
assert data_format in [
"atomic",
"charge",
], "Unrecgonized data format. Only support atomic and charge."
if isinstance(data, pl.DataFrame):
for col in ["id", "type", "x", "y", "z"]:
assert col in data.columns
else:
assert pos.shape[1] == 3
if type_list is None:
type_list = np.ones(pos.shape[0], int)
else:
assert len(type_list) == pos.shape[0]
type_list = np.array(type_list, int)
data = pl.DataFrame(
{
"id": np.arange(pos.shape[0]) + 1,
"type": type_list,
"x": pos[:, 0],
"y": pos[:, 1],
"z": pos[:, 2],
}
)
if num_type is None:
num_type = data["type"].max()
else:
assert isinstance(num_type, int)
assert (
num_type >= data["type"].max()
), f"num_type should be >= {data['type'].max()}."
with open(output_name, "wb") as op:
op.write("# LAMMPS data file written by mdapy.\n\n".encode())
op.write(f"{data.shape[0]} atoms\n{num_type} atom types\n\n".encode())
op.write(
f"{new_box[-1, 0]} {new_box[-1, 0]+new_box[0, 0]} xlo xhi\n".encode()
)
op.write(
f"{new_box[-1, 1]} {new_box[-1, 1]+new_box[1, 1]} ylo yhi\n".encode()
)
op.write(
f"{new_box[-1, 2]} {new_box[-1, 2]+new_box[2, 2]} zlo zhi\n".encode()
)
xy, xz, yz = new_box[1, 0], new_box[2, 0], new_box[2, 1]
if xy != 0 or xz != 0 or yz != 0: # Triclinic box
op.write(f"{xy} {xz} {yz} xy xz yz\n".encode())
op.write("\n".encode())
op.write(rf"Atoms # {data_format}".encode())
op.write("\n\n".encode())
if data_format == "atomic":
table = data.select(["id", "type", "x", "y", "z"])
elif data_format == "charge":
if "q" not in data.columns:
table = data.select(["id", "type", "x", "y", "z"])
table.insert_at_idx(2, pl.Series("q", np.zeros(data.shape[0])))
else:
table = data.select(["id", "type", "q", "x", "y", "z"])
table.write_csv(op, separator=" ", include_header=False)
if "vx" in data.columns and "vy" in data.columns and "vz" in data.columns:
op.write("\nVelocities\n\n".encode())
table = data.select(["id", "vx", "vy", "vz"])
table.write_csv(op, separator=" ", include_header=False)
[docs]
@staticmethod
def write_dump(
output_name,
box,
boundary,
data=None,
pos=None,
type_list=None,
timestep=0,
compress=False,
):
assert isinstance(output_name, str)
assert isinstance(box, np.ndarray)
assert len(boundary) == 3
assert box.shape == (3, 2) or box.shape == (4, 3)
if box.shape == (3, 2):
new_box = np.zeros((4, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
new_box[-1] = box[:, 0]
else:
assert box[0, 1] == 0
assert box[0, 2] == 0
assert box[1, 2] == 0
new_box = box
if isinstance(data, pl.DataFrame):
for col in ["id", "type", "x", "y", "z"]:
assert col in data.columns
else:
assert pos.shape[1] == 3
if type_list is None:
type_list = np.ones(pos.shape[0], int)
else:
assert len(type_list) == pos.shape[0]
type_list = np.array(type_list, int)
data = pl.DataFrame(
{
"id": np.arange(pos.shape[0]) + 1,
"type": type_list,
"x": pos[:, 0],
"y": pos[:, 1],
"z": pos[:, 2],
}
)
data = data.select(pl.selectors.by_dtype(pl.NUMERIC_DTYPES))
if compress:
path, name = os.path.split(output_name)
if name.split(".")[-1] == "gz":
name = name[:-3]
temp_dir = mkdtemp()
output_name = os.path.join(temp_dir, name)
boundary2str = ["pp" if i == 1 else "ss" for i in boundary]
with open(output_name, "wb") as op:
op.write(f"ITEM: TIMESTEP\n{timestep}\n".encode())
op.write("ITEM: NUMBER OF ATOMS\n".encode())
op.write(f"{data.shape[0]}\n".encode())
xlo, ylo, zlo = new_box[3]
xhi, yhi, zhi = (
xlo + new_box[0, 0],
ylo + new_box[1, 1],
zlo + new_box[2, 2],
)
xy, xz, yz = new_box[1, 0], new_box[2, 0], new_box[2, 1]
if xy != 0 or xz != 0 or yz != 0: # Triclinic box
xlo_bound = xlo + min(0.0, xy, xz, xy + xz)
xhi_bound = xhi + max(0.0, xy, xz, xy + xz)
ylo_bound = ylo + min(0.0, yz)
yhi_bound = yhi + max(0.0, yz)
zlo_bound = zlo
zhi_bound = zhi
op.write(
f"ITEM: BOX BOUNDS xy xz yz {boundary2str[0]} {boundary2str[1]} {boundary2str[2]}\n".encode()
)
op.write(f"{xlo_bound} {xhi_bound} {xy}\n".encode())
op.write(f"{ylo_bound} {yhi_bound} {xz}\n".encode())
op.write(f"{zlo_bound} {zhi_bound} {yz}\n".encode())
else:
op.write(
f"ITEM: BOX BOUNDS {boundary2str[0]} {boundary2str[1]} {boundary2str[2]}\n".encode()
)
op.write(f"{xlo} {xhi}\n".encode())
op.write(f"{ylo} {yhi}\n".encode())
op.write(f"{zlo} {zhi}\n".encode())
col_name = "ITEM: ATOMS " + " ".join(data.columns) + " \n"
op.write(col_name.encode())
data.write_csv(op, separator=" ", include_header=False)
if compress:
compress_file(output_name, os.path.join(path, name + ".gz"))
shutil.rmtree(temp_dir)
[docs]
class BuildSystem:
[docs]
@classmethod
def fromfile(cls, filename, fmt):
if fmt in ["dump", "dump.gz"]:
return cls.read_dump(filename, fmt)
elif fmt in ["data", "lmp"]:
return cls.read_data(filename)
elif fmt == "POSCAR":
return cls.read_POSCAR(filename)
elif fmt == "xyz":
return cls.read_xyz(filename)
[docs]
@staticmethod
def fromarray(pos, box, boundary, vel, type_list):
assert pos.shape[1] == 3
assert box.shape == (3, 2) or box.shape == (4, 3)
assert len(boundary) == 3
if box.shape == (3, 2):
new_box = np.zeros((4, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
new_box[-1] = box[:, 0]
else:
new_box = box
if type_list is None:
type_list = np.ones(pos.shape[0], int)
assert len(type_list) == pos.shape[0]
assert type_list.dtype in [
np.int32,
np.int64,
], "type_list should be int32 or int64"
data = pl.DataFrame(
{
"id": np.arange(pos.shape[0]) + 1,
"type": type_list,
"x": pos[:, 0],
"y": pos[:, 1],
"z": pos[:, 2],
}
)
if vel is not None:
assert vel.shape == pos.shape
data.with_columns(
pl.lit(vel[:, 0]).alias("vx"),
pl.lit(vel[:, 1]).alias("vy"),
pl.lit(vel[:, 2]).alias("vz"),
)
return data, box, boundary
[docs]
@staticmethod
def fromdata(data, box, boundary):
assert "x" in data.columns
assert "y" in data.columns
assert "z" in data.columns
assert len(boundary) == 3
assert box.shape == (3, 2) or box.shape == (4, 3)
if box.shape == (3, 2):
new_box = np.zeros((4, 3), dtype=box.dtype)
new_box[0, 0], new_box[1, 1], new_box[2, 2] = box[:, 1] - box[:, 0]
new_box[-1] = box[:, 0]
else:
new_box = box
return data, box, boundary
[docs]
@staticmethod
def read_xyz(filename):
head = []
with open(filename) as op:
for i in range(3):
head.append(op.readline())
natom = int(head[0].split()[0])
classical = True
if "Lattice" in head[1] and "Properties" in head[1]:
classical = False
info = head[1]
if "pbc=" in info:
pindex = info.index("pbc=") + len("pbc=")
boundary = [
1 if i == "T" or i == "1" else 0
for i in info[pindex + 1 : pindex + 6].split()
]
else:
boundary = [1, 1, 1]
bindex = info.index("Lattice=") + len("Lattice=")
try:
box = np.array(info[bindex:].split("'")[1].split(), float).reshape(3, 3)
except Exception:
box = np.array(info[bindex:].split('"')[1].split(), float).reshape(3, 3)
assert (
box[0, 1] == box[0, 2] == box[1, 2] == 0
), "Only support lammps style box! box[0, 1]==box[0, 2]==box[1, 2]==0."
if "Origin=" in info:
oindex = info.index("Origin=") + len("Origin=")
origin = np.expand_dims(
np.array(info[oindex:].split('"')[1].split(), float), axis=0
)
box = np.r_[box, origin]
pindex = info.index("Properties=") + len("Properties=")
content = info[pindex:].split()[0].split(":")
i = 0
columns = []
schema = {}
while i < len(content) - 2:
n_col = int(content[i + 2])
if content[i + 1] == "S":
dtype = pl.Utf8
elif content[i + 1] == "R":
dtype = pl.Float64
elif content[i + 1] == "I":
dtype = pl.Int64
else:
raise f"Unrecognized type {content[i+1]}."
if (
content[i] == "pos"
and content[i + 1] == "R"
and content[i + 2] == "3"
):
columns.extend(["x", "y", "z"])
schema["x"] = dtype
schema["y"] = dtype
schema["z"] = dtype
elif (
content[i] in ["species", "type_name", "element"]
and content[i + 2] == "1"
):
columns.append("type_name")
schema["type_name"] = dtype
elif (
content[i] == "velo"
and content[i + 1] == "R"
and content[i + 2] == "3"
):
columns.extend(["vx", "vy", "vz"])
schema["vx"] = dtype
schema["vy"] = dtype
schema["vz"] = dtype
elif (
content[i] in ["force", "forces"]
and content[i + 1] == "R"
and content[i + 2] == "3"
):
columns.extend(["fx", "fy", "fz"])
schema["fx"] = dtype
schema["fy"] = dtype
schema["fz"] = dtype
else:
if n_col > 1:
for j in range(n_col):
columns.append(content[i] + f"_{j}")
schema[content[i] + f"_{j}"] = dtype
else:
columns.append(content[i])
schema[content[i]] = dtype
i += 3
else:
boundary = [0, 0, 0]
columns = ["type_name", "x", "y", "z"]
schema = {
"type_name": pl.Utf8,
"x": pl.Float64,
"y": pl.Float64,
"z": pl.Float64,
}
multi_space = False
if head[-1].count(" ") != len(columns) - 1:
multi_space = True
if classical:
if multi_space:
type_name, x, y, z = [], [], [], []
with open(filename) as op:
op.readline() # skip head
op.readline()
for i in range(natom):
content = op.readline().split()
type_name.append(content[0])
x.append(content[1])
y.append(content[2])
z.append(content[3])
df = pl.DataFrame(
{"type_name": type_name, "x": x, "y": y, "z": z}, schema=schema
)
else:
df = pl.read_csv(
filename,
separator=" ",
schema=schema,
skip_rows=2,
new_columns=columns,
columns=range(len(columns)),
has_header=False,
truncate_ragged_lines=True,
)
if df["type_name"][0].isdigit():
df = df.with_columns(pl.col("type_name").cast(int)).rename(
{"type_name": "type"}
)
else:
type_name2type = {
j: i + 1
for i, j in enumerate(df["type_name"].unique(maintain_order=True))
}
df = df.with_columns(
pl.col("type_name").map_dict(type_name2type).alias("type")
)
df = df.with_row_count("id", offset=1)
coor = df.select("x", "y", "z")
box = np.r_[
np.eye(3) * (coor.max() - coor.min()).to_numpy(), coor.min().to_numpy()
]
else:
if multi_space:
data = {}
for i in columns:
data[i] = []
with open(filename) as op:
op.readline() # skip head
op.readline()
for i in range(natom):
for key, j in zip(columns, op.readline().split()):
data[key].append(j)
df = pl.DataFrame(data, schema=schema)
else:
df = pl.read_csv(
filename,
separator=" ",
schema=schema,
skip_rows=2,
new_columns=columns,
columns=range(len(columns)),
has_header=False,
truncate_ragged_lines=True,
)
if "Origin=" not in info:
box = np.r_[box, df.select("x", "y", "z").min().to_numpy()]
if "id" not in df.columns:
df = df.with_row_count("id", offset=1)
if "type" not in df.columns:
if "type_name" in df.columns:
if df["type_name"][0].isdigit():
df = df.with_columns(pl.col("type_name").cast(int)).rename(
{"type_name": "type"}
)
else:
name_list = df["type_name"].unique(maintain_order=True)
name2type = {j: i + 1 for i, j in enumerate(name_list)}
df = df.with_columns(
pl.col("type_name").map_dict(name2type).alias("type")
)
else:
df = df.with_columns(pl.lit(1).alias("type"))
return df, box, boundary
[docs]
@staticmethod
def read_POSCAR(filename):
with open(filename) as op:
file = op.readlines()
scale = float(file[1].strip())
need_rotation = False
box = np.array([i.split() for i in file[2:5]], float) * scale
a, b, c = box
if box[0, 1] != 0 or box[0, 2] != 0 or box[1, 2] != 0:
ax = np.linalg.norm(box[0])
bx = box[1] @ (box[0] / ax)
by = np.sqrt(np.linalg.norm(box[1]) ** 2 - bx**2)
cx = box[2] @ (box[0] / ax)
AXB = np.cross(box[0], box[1])
cy = (box[1] @ box[2] - bx * cx) / by
cz = np.sqrt(np.linalg.norm(box[2]) ** 2 - cx**2 - cy**2)
rotation = np.array([[ax, bx, cx], [0, by, cy], [0, 0, cz]])
box = rotation.T
need_rotation = True
row = 5
type_list, type_name_list = [], []
if file[5].strip()[0].isdigit():
for atype, num in enumerate(file[5].split()):
type_list.extend([atype + 1] * int(num))
row += 1
else:
assert file[6].strip()[0].isdigit()
content = file[5].split()
name_dict = {}
atype = 1
for name in content:
if name not in name_dict.keys():
name_dict[name] = atype
atype += 1
for atype, num in enumerate(file[6].split()):
t_name = content[atype]
type_name_list.extend([t_name] * int(num))
type_list.extend([name_dict[t_name]] * int(num))
row += 2
selective_dynamics = False
if file[row].strip()[0] in ["S", "s"]:
row += 1
selective_dynamics = True
natoms = len(type_list)
pos, sd = [], []
if selective_dynamics:
for line in file[row + 1 : row + 1 + natoms]:
content = line.split()
pos.append(content[:3])
sd.append(content[-3:])
sd = np.array(sd)
else:
for line in file[row + 1 : row + 1 + natoms]:
pos.append(line.split()[:3])
pos = np.array(pos, float)
if file[row][0] in ["C", "c", "K", "k"]:
pos *= scale
else:
pos = (pos @ np.c_[a, b, c]) * scale
if need_rotation:
rotation_M = (
rotation / np.inner(box[0], np.cross(box[1], box[2]))
) @ np.c_[np.cross(b, c), np.cross(c, a), np.cross(a, b)]
pos = (rotation_M @ pos.T).T
row += natoms + 1
vel = []
if row <= len(file) - 1:
if len(file[row].split()) > 0:
if file[row].strip()[0] in ["L", "l"]: # skip the lattice velocities
row += 8
if row + 1 + natoms <= len(file):
vel = np.array([i.split() for i in file[row + 1 : row + 1 + natoms]], float)
if len(file[row].split()) > 0:
if file[row].strip()[0] not in ["C", "c", "K", "k"]:
vel = vel @ np.c_[a, b, c]
if need_rotation:
vel = (rotation_M @ vel.T).T
data = {}
data["id"] = np.arange(1, natoms + 1)
data["type"] = type_list
if len(type_name_list) > 0:
data["type_name"] = type_name_list
data["x"] = pos[:, 0]
data["y"] = pos[:, 1]
data["z"] = pos[:, 2]
if len(sd) > 0:
data["sdx"] = sd[:, 0]
data["sdy"] = sd[:, 1]
data["sdz"] = sd[:, 2]
if len(vel) > 0:
data["vx"] = vel[:, 0]
data["vy"] = vel[:, 1]
data["vz"] = vel[:, 2]
data = pl.DataFrame(data)
return data, np.r_[box, np.array([[0, 0, 0.0]])], [1, 1, 1]
[docs]
@staticmethod
def read_data(filename):
data_head = []
box = np.zeros((4, 3))
row = 0
xy, xz, yz = 0, 0, 0
with open(filename) as op:
while True:
line = op.readline()
data_head.append(line)
content = line.split()
if len(content):
if content[-1] == "atoms":
N = int(content[0])
if len(content) >= 2:
if content[1] == "bond":
raise "Do not support bond style."
if content[-1] == "xhi":
xlo, xhi = float(content[0]), float(content[1])
if content[-1] == "yhi":
ylo, yhi = float(content[0]), float(content[1])
if content[-1] == "zhi":
zlo, zhi = float(content[0]), float(content[1])
if content[-1] == "yz":
xy, xz, yz = (
float(content[0]),
float(content[1]),
float(content[2]),
)
if content[0] == "Atoms":
break
row += 1
box = np.array(
[
[xhi - xlo, 0, 0],
[xy, yhi - ylo, 0],
[xz, yz, zhi - zlo],
[xlo, ylo, zlo],
]
)
boundary = [1, 1, 1]
row += 2 # Coordination part
if data_head[-1].split()[-1] == "atomic":
col_names = ["id", "type", "x", "y", "z"]
elif data_head[-1].split()[-1] == "charge":
col_names = ["id", "type", "q", "x", "y", "z"]
else:
with open(filename) as op:
for _ in range(row):
op.readline()
line = op.readline()
if len(line.split()) == 5:
col_names = ["id", "type", "x", "y", "z"]
elif len(line.split()) == 6:
col_names = ["id", "type", "q", "x", "y", "z"]
else:
raise "Unrecgonized data format. Only support atomic and charge."
data = pl.read_csv(
filename,
separator=" ",
skip_rows=row,
n_rows=N,
new_columns=col_names,
columns=range(len(col_names)),
has_header=False,
truncate_ragged_lines=True,
ignore_errors=True,
)
row += N
try:
vel = pl.read_csv(
filename,
separator=" ",
skip_rows=row + 3,
new_columns=["vx", "vy", "vz"],
columns=range(1, 4),
has_header=False,
truncate_ragged_lines=True,
ignore_errors=True,
)
assert vel.shape[0] == data.shape[0]
data = pl.concat([data, vel], how="horizontal")
except Exception:
pass
return data, box, boundary
[docs]
@staticmethod
def read_dump(filename, fmt):
assert fmt in ["dump", "dump.gz"], "Only support dump or dump.gz format."
dump_head = []
if fmt == "dump":
with open(filename) as op:
for _ in range(9):
dump_head.append(op.readline())
elif fmt == "dump.gz":
with gzip.open(filename) as op:
for _ in range(9):
dump_head.append(op.readline().decode())
timestep = int(dump_head[1].strip())
line = dump_head[4].split()
boundary = [1 if i == "pp" else 0 for i in line[-3:]]
if "xy" in line:
xlo_bound, xhi_bound, xy = np.array(dump_head[5].split(), float)
ylo_bound, yhi_bound, xz = np.array(dump_head[6].split(), float)
zlo_bound, zhi_bound, yz = np.array(dump_head[7].split(), float)
xlo = xlo_bound - min(0.0, xy, xz, xy + xz)
xhi = xhi_bound - max(0.0, xy, xz, xy + xz)
ylo = ylo_bound - min(0.0, yz)
yhi = yhi_bound - max(0.0, yz)
zlo = zlo_bound
zhi = zhi_bound
box = np.array(
[
[xhi - xlo, 0, 0],
[xy, yhi - ylo, 0],
[xz, yz, zhi - zlo],
[xlo, ylo, zlo],
]
)
else:
box = np.array([i.split()[:2] for i in dump_head[5:8]]).astype(float)
xlo, xhi = np.array(dump_head[5].split(), float)
ylo, yhi = np.array(dump_head[6].split(), float)
zlo, zhi = np.array(dump_head[7].split(), float)
box = np.array(
[
[xhi - xlo, 0, 0],
[0, yhi - ylo, 0],
[0, 0, zhi - zlo],
[xlo, ylo, zlo],
]
)
col_names = dump_head[8].split()[2:]
data = pl.read_csv(
filename,
separator=" ",
skip_rows=9,
new_columns=col_names,
columns=range(len(col_names)),
has_header=False,
truncate_ragged_lines=True,
)
return data, box, boundary, timestep