Source code for mammos_spindynamics.uppasd._data

"""UppASD Data class."""

from __future__ import annotations

import pathlib
import re
from io import StringIO
from typing import TYPE_CHECKING

import mammos_entity as me
import mammos_units as u
import numpy as np
import pandas as pd
import yaml

if TYPE_CHECKING:
    import mammos_entity
    import numpy
    import pandas


[docs] def read(out: pathlib.Path | str) -> MammosUppasdData | RunData | TemperatureSweepData: """Read UppASD calculations results directory.""" out = pathlib.Path(out) if not out.is_dir(): raise RuntimeError(f"Output directory {out} does not exist.") if (info_yaml := out / "mammos_spindynamics.yaml").is_file(): with open(info_yaml) as f: info = yaml.safe_load(f) if info["metadata"]["mode"] == "mammos_uppasd_data": return MammosUppasdData(out) elif info["metadata"]["mode"] == "run": return RunData(out) elif info["metadata"]["mode"] == "temperature_sweep": return TemperatureSweepData(out) else: mode = info["metadata"]["mode"] raise RuntimeError( f"Unable to understand mode: '{mode}' in mammos_spindynamics.yaml. " ) else: raise RuntimeError(f"mammos_spindynamics.yaml file not found in path: {out}.")
[docs] class MammosUppasdData: """Collection of UppASD Result instances.""" def __init__(self, out: pathlib.Path | str): """Initialize MammosUppasdData given the directory containing all runs.""" self.out = pathlib.Path(out) def __repr__(self): """Define repr.""" return f"MammosUppasdData('{self.out}')" def __getitem__(self, idx): """Extract i-th run.""" runs = [ run_dir for run_dir in self.out.iterdir() if re.match(r"\d+-(run|temperature_sweep)$", run_dir.name) ] runs.sort(key=lambda x: int(x.name.split("-")[0])) return read(runs[idx])
[docs] def get(self, **kwargs): """Extract run based on different filters.""" df = self.info() for key, value in kwargs.items(): df = df[df[key] == value] if len(df) == 0: raise LookupError( f"Requested run not found.\nSelection: {kwargs}\n" f"Available data: {self.info()}" ) if len(df) > 1: error_string = ( "Too many results. Please refine your search.\n" + "Avilable materials based on request:\n" ) error_string += str(df) raise LookupError(error_string) return read(self.out / df.iloc[0]["name"])
[docs] def info( self, include_description: bool = True, include_time_elapsed: bool = True, include_parameters: bool = True, ) -> pandas.DataFrame: """Dataframe containing information about all available UppASD runs.""" metadata_keys = [] if include_description: metadata_keys += ["description"] if include_time_elapsed: metadata_keys += ["time_elapsed"] all_runs = [] index = [] for run in self: if run: info_ = {"name": run.out.name} index.append(int(run.out.name.split("-")[0])) if metadata_keys: metadata_ = {k: run.metadata.get(k) for k in metadata_keys} info_ = {**info_, **metadata_} if include_parameters: info_ = {**info_, **run.parameters} all_runs.append(info_) return pd.DataFrame(all_runs, index=index)
[docs] class RunData: """UppASD Data parser class for a single run.""" def __init__(self, out: pathlib.Path): """Initialize Data given the output directory of a single run.""" self.out = pathlib.Path(out) with open(self.out / "mammos_spindynamics.yaml") as f: info = yaml.safe_load(f) self._input_dictionary = _parse_inpsd_file(self.inpsd) self.metadata = info["metadata"] self.parameters = info["parameters"] df = pd.read_csv(self.cumulants, sep=r"\s+") self._cumulant_data = df.iloc[-1] def __repr__(self): """Define repr.""" return f"RunData('{self.out}')"
[docs] def info( self, include_description: bool = True, include_time_elapsed: bool = True, include_parameters: bool = True, ) -> pandas.DataFrame: """Return information about the UppASD run.""" out = {"name": [self.out.name]} if include_description: out["description"] = [self.metadata["description"]] if include_time_elapsed: out["time_elapsed"] = [self.metadata["time_elapsed"]] if include_parameters: parameters_dictionary = {k: [v] for k, v in self.parameters.items()} out = {**out, **parameters_dictionary} return pd.DataFrame(out)
@property def T(self) -> float: """Get Thermodynamics Temperature.""" return me.T(self._input_dictionary["temp"]) @property def inpsd(self) -> pathlib.Path: """Get path of ``inpsd.dat`` input file.""" return pathlib.Path(self.out / "inpsd.dat") @property def exchange(self) -> pathlib.Path: """Get path of file containing exchange interactions.""" return self.out / self._input_dictionary["exchange"] @property def momfile(self) -> pathlib.Path: """Get path of file containing magnetic moments.""" return self.out / self._input_dictionary["momfile"] @property def posfile(self) -> pathlib.Path: """Get path of file containing atomic positions.""" return self.out / self._input_dictionary["posfile"] @property def cumulants(self) -> pathlib.Path: """Get ``cumulants.*.out`` file.""" cumulants = list(self.out.glob("cumulants.*.out")) if not cumulants: raise RuntimeError(f"Could not find a cumulants.<simid>.out in {self.out}") if len(cumulants) > 1: # we assume that this will never happen raise RuntimeError( f"Found multiple candidates for cumulants.<simid>.out in {self.out}:\n" f"{cumulants}" ) return cumulants[0] @property def restartfile(self) -> pathlib.Path: """Get path of restart file.""" restart_files = list(self.out.glob("restart.*.out")) if not restart_files: raise RuntimeError(f"Could not find restart.<simid>.out in {self.out}") if len(restart_files) > 1: # we assume that this will never happen raise RuntimeError( f"Found multiple candidates for restart.<simid>.out in {self.out}:\n" f"{restart_files}" ) return restart_files[0] @property def n_magnetic_atoms(self) -> int: """Get number of magnetic atoms.""" with open(self.momfile, encoding="utf-8") as file: n = len(file.readlines()) return n @property def Ms(self) -> mammos_entity.Entity: """Get Spontaneous Magnetization.""" cell = self._input_dictionary["cell"] lattice_const = self._input_dictionary["alat"] * u.m cell_volume = np.dot(cell[0], np.cross(cell[1], cell[2])) * lattice_const**3 Ms_mu_B_per_atom = float(self._cumulant_data["<M>"]) * u.mu_B Ms = Ms_mu_B_per_atom * self.n_magnetic_atoms / cell_volume return me.Ms(Ms, unit="A/m") @property def Cv(self) -> mammos_entity.Entity: """Get specific heat capacity.""" k_B = u.constants.k_B Cv = float(self._cumulant_data["C_v(tot)"]) * k_B * self.n_magnetic_atoms return me.Entity("IsochoricHeatCapacity", Cv) @property def E(self) -> mammos_entity.Entity: """Get energy.""" E = float(self._cumulant_data["<E>"]) * u.mRy * self.n_magnetic_atoms return me.Entity("Energy", E, unit="J") @property def U_binder(self) -> float: """Get U_binder value.""" U_b = float(self._cumulant_data["U_{Binder}"]) return U_b
[docs] class TemperatureSweepData: """Class for the result of the temperature_array runner.""" def __init__(self, out: pathlib.Path | str): """Initialize TemperatureSweepData given the output directory of a sweep run.""" self.out = pathlib.Path(out) with open(self.out / "mammos_spindynamics.yaml") as f: info = yaml.safe_load(f) self.metadata = info["metadata"] self.parameters = info["parameters"] def __repr__(self): """Define repr.""" return f"TemperatureSweepData('{self.out}')" def __getitem__(self, idx): """Extract i-th run.""" runs = [ run_dir for run_dir in self.out.iterdir() if re.match(r"\d+-run", run_dir.name) ] runs.sort(key=lambda x: int(x.name.split("-")[0])) return read(runs[idx])
[docs] def info( self, include_description: bool = True, include_time_elapsed: bool = True, include_parameters: bool = True, ) -> pandas.DataFrame: """Dataframe containing information about all available UppASD runs.""" metadata_keys = [] if include_description: metadata_keys += ["description"] if include_time_elapsed: metadata_keys += ["time_elapsed"] all_runs = [] index = [] for run in self: if run: info_ = {"name": run.out.name} index.append( int( run.out.name.replace("-run", "").replace( "-temperature_sweep", "" ) ) ) if metadata_keys: metadata_ = {k: run.metadata[k] for k in metadata_keys} info_ = {**info_, **metadata_} if include_parameters: info_ = {**info_, **run.parameters} all_runs.append(info_) return pd.DataFrame(all_runs, index=index)
[docs] def get(self, **kwargs) -> RunData: """Select run satisfying certain filters defined in the keyword arguments. If there are multiple matches, it returns the first one. """ df = self.info() for key, val in kwargs.items(): df = df[df[key] == val] if len(df) == 0: raise LookupError( f"Requested run not found.\nSelection: {kwargs}\n" f"Available data: {self.info()}" ) if len(df) > 1: error_string = ( "Too many results. Please refine your search.\n" + "Avilable materials based on request:\n" ) error_string += str(df) raise LookupError(error_string) return read(self.out / df.iloc[0]["name"])
@property def T(self) -> mammos_entity.Entity: """Get Thermodynamics Temperature.""" return me.concat_flat(*[run.T for run in self if run]) @property def Ms(self) -> mammos_entity.Entity: """Get Spontaneous Magnetization.""" return me.concat_flat(*[run.Ms for run in self if run]) @property def Cv(self) -> mammos_entity.Entity: """Get Isochoric Heat Capacity. The isochorich (at constant volume) heat capacity Cv is evaluated as the derivative of the energy as a function of temperature. """ k_B = u.constants.k_B.value Cv = np.gradient(self.E.value / k_B, self.T.value, axis=0) return me.Entity("IsochoricHeatCapacity", Cv) @property def U_binder(self) -> numpy.ndarray: """Get Binder coefficient.""" return np.array([run.U_binder for run in self if run]) @property def E(self) -> mammos_entity.Entity: """Get Energy.""" return me.concat_flat(*[run.E for run in self if run])
[docs] def save_output(self, out: pathlib.Path | str) -> None: """Save output files M(T) and output.csv in directory `out`. `M(T)` contains all information evaluated from the cumulant files. `output.csv` contains entities for information temperature, magnetization, Binder cumulant and heat capacity. """ out = pathlib.Path(out) out.mkdir(parents=True, exist_ok=True) with open(self[0].cumulants) as f: lines = f.readlines() header = lines[0] with open(out / "M(T)", "w") as f: f.write(f"{'T':>5} {header}") for run in self: if run: with open(run.cumulants) as f_run: lines = f_run.readlines() f.write(f"{run.T.value:>5.0f} {lines[-1]}") me.io.entities_to_file( out / "output.csv", "Magnetization and heat capacity from UppASD", T=self.T, Ms=self.Ms, U_binder=self.U_binder, Cv=self.Cv, E=self.E, )
def _parse_inpsd_file(inpsd_file: pathlib.Path | str) -> dict: """Parse inpsd.dat file.""" string_parameters = { "simid", "exchange", "momfile", "posfile", "restartfile", } float_parameters = { "alat", "temp", } with open(inpsd_file, encoding="utf-8") as file: lines = file.readlines() parameters = {} for i, line in enumerate(lines): for par in string_parameters: if re.match(f"{par} .*", line): parameters[par] = line.split()[1] for par in float_parameters: if re.match(f"{par} .*", line): try: parameters[par] = float(line.split()[1]) except ValueError: continue if re.match("(bc|BC) .*", line): parameters["bc"] = line.removeprefix("bc").split()[:3] if re.match("cell .*", line): a = np.genfromtxt(StringIO(line.removeprefix("cell"))) b = np.genfromtxt(StringIO(lines[i + 1])) c = np.genfromtxt(StringIO(lines[i + 2])) parameters["cell"] = np.vstack((a, b, c)) if re.match("ncell .*", line): parameters["ncell"] = np.genfromtxt(StringIO(line.removeprefix("ncell")))[ :3 ] return parameters