diff --git a/launch.json b/launch.json index c12767ba..2f71cc15 100644 --- a/launch.json +++ b/launch.json @@ -7,6 +7,13 @@ "request": "launch", "program": "${workspaceFolder}/src/kimmdy/cmd.py", "cwd": "${workspaceFolder}/example/alanine_hat_naive/" + }, + { + "name": "KIMMDY custom run", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/src/kimmdy/cmd.py", + "cwd": "${workspaceFolder}/../../examples/gly-hydrolysis/" } ] } diff --git a/src/kimmdy/constants.py b/src/kimmdy/constants.py index 29af47ba..8a3f4787 100644 --- a/src/kimmdy/constants.py +++ b/src/kimmdy/constants.py @@ -19,6 +19,20 @@ "dihedral_restraints": [0, 1, 2, 3], } +# from gmx data dir +DEFAULT_EDISSOC: dict[tuple[str, str], float] = { # type: ignore + tuple(sorted(list(k))): v for k, v in { + ("C", "N"): 500.0, + ("CA", "C"): 341.0, + ("CA", "N"): 379.0, + ("CA", "CB"): 400.0, + ("CB", "CG"): 400.0, + ("CG", "CD"): 400.0, + ("CD", "CE"): 400.0, + ("CE", "NZ"): 400.0 +}.items() +} + # see https://manual.gromacs.org/current/reference-manual/topologies/topology-file-formats.html FFFUNC = { "mult_proper_dihedral": "9", diff --git a/src/kimmdy/coordinates.py b/src/kimmdy/coordinates.py index 13b1f6e1..10713a39 100644 --- a/src/kimmdy/coordinates.py +++ b/src/kimmdy/coordinates.py @@ -1,39 +1,46 @@ """coordinate, topology and plumed modification functions""" +from enum import Enum import logging from copy import deepcopy from pathlib import Path from typing import Optional, Union +from math import sqrt import MDAnalysis as mda import numpy as np -from kimmdy.constants import REACTIVE_MOLECULEYPE, FFFUNC +from kimmdy.constants import DEFAULT_EDISSOC, REACTIVE_MOLECULEYPE, FFFUNC from kimmdy.parsing import read_plumed, write_plumed from kimmdy.recipe import Place from kimmdy.tasks import TaskFiles from kimmdy.topology.atomic import ( Angle, + AngleType, Bond, + BondType, Dihedral, DihedralType, - InteractionType, Pair, Exclusion, - ImproperDihedralId, Interaction, AtomicType, InteractionTypes, MultipleDihedrals, - ProperDihedralId, ) from kimmdy.topology.ff import FF from kimmdy.topology.topology import MoleculeType, Topology from kimmdy.topology.utils import match_atomic_item_to_atomic_type +from enum import Enum, auto logger = logging.getLogger(__name__) +class PairTransition(Enum): + Morph = auto() + Create = auto() + Vanish = auto() + # coordinates def place_atom( files: TaskFiles, step: Place, ttime: Optional[float] = None @@ -87,47 +94,6 @@ def is_parameterized(entry: Interaction): return entry.c0 is not None and entry.c1 is not None -def get_explicit_MultipleDihedrals( - dihedral_key: tuple[str, str, str, str], - mol: MoleculeType, - dihedrals_in: Optional[MultipleDihedrals], - ff: FF, - periodicity_max: int = 6, -) -> Optional[MultipleDihedrals]: - """Takes a valid dihedral key and returns explicit - dihedral parameters for a given topology - """ - if not dihedrals_in: - return None - - if "" not in dihedrals_in.dihedrals.keys(): - # empty string means implicit parameters - return dihedrals_in - - type_key = [mol.atoms[id].type for id in dihedral_key] - - multiple_dihedrals = MultipleDihedrals( - *dihedral_key, FFFUNC["mult_proper_dihedral"], {} - ) - for periodicity in range(1, periodicity_max + 1): - match_obj = match_atomic_item_to_atomic_type( - type_key, ff.proper_dihedraltypes, str(periodicity) - ) - if match_obj: - assert isinstance(match_obj, DihedralType) - multiple_dihedrals.dihedrals[str(periodicity)] = Dihedral( - *dihedral_key, - funct=FFFUNC["mult_proper_dihedral"], - c0=match_obj.c0, - c1=match_obj.c1, - periodicity=match_obj.periodicity, - ) - - if not multiple_dihedrals.dihedrals: - return None - - return multiple_dihedrals - def get_explicit_or_type( key: tuple[str, ...], @@ -135,6 +101,7 @@ def get_explicit_or_type( interaction_types: InteractionTypes, mol: MoleculeType, periodicity: str = "", + use_state_b: bool = False, ) -> Union[Interaction, AtomicType, None]: """Takes an Interaction and associated key, InteractionTypes, Topology and Periodicity (for dihedrals) and returns an object with the parameters of this Interaction @@ -145,148 +112,184 @@ def get_explicit_or_type( if is_parameterized(interaction): return interaction - type_key = [mol.atoms[x].type for x in key] - match_obj = match_atomic_item_to_atomic_type( + if use_state_b: + type_key = [] + for id in key: + t = getattr(mol.atoms[id], 'typeB', None) + t = t if t else mol.atoms[id].type + type_key.append(t) + else: + type_key = [mol.atoms[x].type for x in key] + + atomic_type = match_atomic_item_to_atomic_type( type_key, interaction_types, periodicity ) - if match_obj: - assert isinstance(match_obj, InteractionType) - # FIXME: This is a property of `match_atomic_item_to_atomic_type` and should - # be tested there. - return match_obj - else: - raise ValueError( - f"Could not find explicit parameters for {[type_key,interaction]} in line or in {interaction_types}." - ) + if atomic_type is None: + m = f"Could not find explicit parameters for {[type_key,interaction]} in line or in {interaction_types}." + logger.error(m) + raise ValueError(m) + return atomic_type -def merge_dihedrals( - dihedral_key: tuple[str, str, str, str], - dihedral_a: Optional[Dihedral], - dihedral_b: Optional[Dihedral], - dihedral_types_a: Union[ - dict[ProperDihedralId, DihedralType], dict[ImproperDihedralId, DihedralType] - ], - dihedral_types_b: Union[ - dict[ProperDihedralId, DihedralType], dict[ImproperDihedralId, DihedralType] - ], - molA: MoleculeType, - molB: MoleculeType, - funct: str, - periodicity: str, -) -> Dihedral: - """Merge one to two Dihedrals or -Types into a Dihedral in free-energy syntax""" - # convert implicit standard ff parameters to explicit, if necessary - if dihedral_a: - parameterizedA = get_explicit_or_type( - dihedral_key, - dihedral_a, - dihedral_types_a, - molA, - periodicity, - ) - else: - parameterizedA = None - - if dihedral_b: - parameterizedB = get_explicit_or_type( - dihedral_key, - dihedral_b, - dihedral_types_b, - molB, - periodicity, - ) - else: - parameterizedB = None +class MoleculeTypeMerger: + """Takes two Topologies and joins them for a smooth free-energy like parameter transition simulation""" + + def __init__( + self, + mol_a: MoleculeType, + mol_b: MoleculeType, + ff: FF, + enable_morph_pairs: bool, + ) -> None: + self.mol_a = mol_a + self.mol_b = mol_b + self.ff = ff + self.enable_morph_pairs = enable_morph_pairs + self.default_morse_well_depth = 300 # [kJ mol^-1] + self.default_beta_for_lj = 19 # matches LJ steepness + + self.affected_pairs = {} + self.new_pairs_bind = {} + + + def merge(self): + """modiefies mol_b, the reactive moleculetype of of top_b, in place""" + logger.info(f"Merging topologies for slow growth") + self.merge_atoms() + self.merge_bonds() + self.merge_pairs() + self.merge_angles() + self.merge_dihedrals() + self.mol_b.find_radicals() + return + + def _get_explicit_MultipleDihedrals( + self, + key: tuple[str, str, str, str], + use_state_b: bool, + use_improper: bool = False, + periodicity_max: int = 6, + ) -> Optional[MultipleDihedrals]: + """Takes a valid dihedral key and returns explicit + dihedral parameters for a given topology + """ + if use_improper: + funct = FFFUNC["mult_improper_dihedral"] + dihedraltypes =self.ff.improper_dihedraltypes + else: + dihedraltypes =self.ff.proper_dihedraltypes + funct = FFFUNC["mult_proper_dihedral"] - # construct parameterized Dihedral - if parameterizedA is not None and parameterizedB is not None: - # same + if use_state_b: + if use_improper: + dihedrals_in = self.mol_b.improper_dihedrals.get(key) + else: + dihedrals_in = self.mol_b.proper_dihedrals.get(key) + else: + if use_improper: + dihedrals_in = self.mol_a.improper_dihedrals.get(key) + else: + dihedrals_in = self.mol_a.proper_dihedrals.get(key) - assert type(parameterizedA) == Dihedral or type(parameterizedA) == DihedralType - assert type(parameterizedB) == Dihedral or type(parameterizedB) == DihedralType + if not dihedrals_in: + return None - dihedralmerge = Dihedral( - *dihedral_key, - funct=funct, - c0=parameterizedA.c0, - c1=parameterizedA.c1, - periodicity=parameterizedA.periodicity, - c3=parameterizedB.c0, - c4=parameterizedB.c1, - c5=parameterizedB.periodicity, - ) - elif parameterizedA is not None: - # breaking - if not ( - type(parameterizedA) == Dihedral or type(parameterizedA) == DihedralType - ): - m = f"parameterizedA {parameterizedA} is not a Dihedral or DihedralType" - logger.warning(m) - raise ValueError(m) - dihedralmerge = Dihedral( - *dihedral_key, - funct=funct, - c0=parameterizedA.c0, - c1=parameterizedA.c1, - periodicity=parameterizedA.periodicity, - c3=parameterizedA.c0, - c4="0.00", - c5=parameterizedA.periodicity, - ) - elif parameterizedB is not None: - # binding - assert type(parameterizedB) == Dihedral or type(parameterizedB) == DihedralType - dihedralmerge = Dihedral( - *dihedral_key, - funct=funct, - c0=parameterizedB.c0, - c1="0.00", - periodicity=parameterizedB.periodicity, - c3=parameterizedB.c0, - c4=parameterizedB.c1, - c5=parameterizedB.periodicity, + # not using implicit parameters + # if "" not in dihedrals_in.dihedrals.keys(): + # # empty string means implicit parameters + # return dihedrals_in + + type_key = [] + for id in key: + if use_state_b: + t = getattr(self.mol_b.atoms[id], 'typeB', None) + t = t if t else self.mol_b.atoms[id].type + else: + t = self.mol_a.atoms[id].type + + type_key.append(t) + + multiple_dihedrals = MultipleDihedrals( + *key, funct=funct, dihedrals={} ) - else: - m = f"Tried to merge two dihedrals of {dihedral_key} but no parameterized dihedrals found!" - logger.error(m) - raise ValueError(m) - return dihedralmerge + for periodicity in range(1, periodicity_max+1): + match_obj = match_atomic_item_to_atomic_type( + id=type_key, types=dihedraltypes, periodicity=str(periodicity) + ) + if match_obj: + assert isinstance(match_obj, DihedralType) + multiple_dihedrals.dihedrals[str(periodicity)] = Dihedral( + *key, + funct=funct, + c0=match_obj.c0, + c1=match_obj.c1, + periodicity=match_obj.periodicity, + ) + if not multiple_dihedrals.dihedrals: + return None -def merge_top_moleculetypes_slow_growth( - molA: MoleculeType, - molB: MoleculeType, - ff: FF, - morph_pairs: bool, -) -> MoleculeType: - """Takes two Topologies and joins them for a smooth free-energy like parameter transition simulation""" + return multiple_dihedrals - def get_LJ_parameters(idx1: str, idx2: str): + def _get_LJ_parameters(self, id1: str, id2: str, use_state_b: bool): """Calculate LJ terms sigma and epsilon from atom types""" - comb_rule = ff.defaults[0][1] + comb_rule = self.ff.defaults[0][1] # fudgeLJ = ff.defaults[0][3] # could be used to compensate fudge in pairs - type1 = ff.atomtypes[molB.atoms[idx1].type] - type2 = ff.atomtypes[molB.atoms[idx2].type] + if use_state_b: + t1 = getattr(self.mol_b.atoms[id1], 'typeB', None) + t2 = getattr(self.mol_b.atoms[id2], 'typeB', None) + t1 = t1 if t1 else self.mol_b.atoms[id1].type + t2 = t2 if t2 else self.mol_b.atoms[id2].type + type1 = self.ff.atomtypes[t1] + type2 = self.ff.atomtypes[t2] + else: + t1 = self.mol_a.atoms[id1].type + t2 = self.mol_a.atoms[id2].type + + # amber fix for breaking/binding atom types without LJ potential + if t1 in ["HW", "HO"]: + logger.debug(f"amber fix for {id1} of type {t1} typeA set to H1") + t1 = "H1" + if t2 in ["HW", "HO"]: + logger.debug(f"amber fix for {id2} of type {t2} typeA set to H1") + t2 = "H1" + + type1 = self.ff.atomtypes[t1] + type2 = self.ff.atomtypes[t2] if comb_rule == "1": - v = np.sqrt(float(type1.sigma) * float(type2.sigma)) - w = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) + sigma = np.sqrt(float(type1.sigma) * float(type2.sigma)) + epsilon = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) elif comb_rule == "2": - v = 0.5 * (float(type1.sigma) + float(type2.sigma)) - w = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) + sigma = 0.5 * (float(type1.sigma) + float(type2.sigma)) + epsilon = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) elif comb_rule == "3": - v = np.sqrt(float(type1.sigma) * float(type2.sigma)) - w = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) + sigma = np.sqrt(float(type1.sigma) * float(type2.sigma)) + epsilon = np.sqrt(float(type1.epsilon) * float(type2.epsilon)) else: raise ValueError("Unknown combination rule of forcefield") - return v, w + logger.info(f"Got LJ parameters for {id1}-{id2} of types {type1.type}-{type2.type}: sigma {sigma} and epsilon {epsilon} from mol_b ({use_state_b})") + return sigma, epsilon + + def _get_morse_parameters(self, atomtypes: tuple[str, str], bond: Bond|None=None): + well_depth = self.default_morse_well_depth + type_key: tuple[str, str] = tuple(sorted(atomtypes)) # type: ignore + d = DEFAULT_EDISSOC.get(type_key) + if d is not None: + well_depth = d + k = bond.c1 if bond else None + if k is not None: + # like in gmx /src/gromacs/gmxpreprocess/tomorse.cpp + beta = sqrt(float(k)/(2*well_depth)) + else: + beta = self.default_beta_for_lj + return well_depth, beta - def make_pair(idx1: str, idx2: str, bind: bool) -> Pair: + def _make_pair(self, id1: str, id2: str, transition: PairTransition) -> Pair: """Generates morphing pair interaction If it is for a binding event, the pair is vanishing, as it will be an @@ -295,38 +298,40 @@ def make_pair(idx1: str, idx2: str, bind: bool) -> Pair: Parameters ---------- - idx1 : str + id1 : str Atom one - idx2 : str + id2 : str Atom two - bind : bool + is_bind : bool Binding or breaking event. Determines morphing direction. + morph_pairs Returns ------- Pair Morphing pair """ - v, w = get_LJ_parameters(idx1, idx2) + sigmaij_a, epsilonij_a = self._get_LJ_parameters(id1=id1, id2=id2, use_state_b=False) + sigmaij_b, epsilonij_b = self._get_LJ_parameters(id1=id1, id2=id2, use_state_b=True) + # defaults ar 0.0 for all c0-c3 c_kwargs = dict( zip( [f"c{i}" for i in range(4)], [f"{0.0:.5f}" for _ in range(4)], ) ) - if morph_pairs: - # Bind: pair interaction turning off - if bind: - c_kwargs["c0"] = f"{v:.5f}" - c_kwargs["c1"] = f"{w:.5f}" - # Break: pair interaction turning on - else: - c_kwargs["c2"] = f"{v:.5f}" - c_kwargs["c3"] = f"{w:.5f}" - - return Pair(idx1, idx2, funct=ff.defaults[0][0], **c_kwargs) - - def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): + if transition == PairTransition.Vanish or transition == PairTransition.Morph: + # pair interaction turning off (bind or morph) + c_kwargs["c0"] = f"{sigmaij_a:.5f}" + c_kwargs["c1"] = f"{epsilonij_a:.5f}" + if transition == PairTransition.Create or transition == PairTransition.Morph: + # pair interaction turning on (break or morph) + c_kwargs["c2"] = f"{sigmaij_b:.5f}" + c_kwargs["c3"] = f"{epsilonij_b:.5f}" + + return Pair(id1, id2, funct=self.ff.defaults[0][0], **c_kwargs) + + def _interpolate_two_pairs(self, break_pair: Optional[Pair], bind_pair: Optional[Pair]): """Merges two morphing pairs into a morphing pair for slow-growth. Takes starting state of break_pair and end state of bind_pair. @@ -342,17 +347,11 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): Pair Merged pair containing four parameters. """ - - assert len(ps := list(filter(lambda x: x, (break_pair, bind_pair)))) > 0 - ai = ps[0].ai # type: ignore - aj = ps[0].aj # type: ignore - funct = ps[0].funct # type: ignore - - for p in ps: - assert p.c0 is not None, "Pair must contain c0" # type: ignore - assert p.c1 is not None, "Pair must contain c1" # type: ignore - assert p.c2 is not None, "Pair must contain c2" # type: ignore - assert p.c3 is not None, "Pair must contain c3" # type: ignore + given_pair = break_pair if break_pair else bind_pair + assert given_pair is not None, "At least one of break_pair or bind_pair has to be non None" + ai = given_pair.ai + aj = given_pair.aj + funct = given_pair.funct if break_pair and bind_pair: assert ( @@ -365,6 +364,13 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): break_pair.aj == bind_pair.aj ), "Atoms must be the same in pairs to interpolate between" + # TODO: + for pair in [break_pair, bind_pair]: + if pair is not None and not is_parameterized(pair): + v, w = self._get_LJ_parameters(pair.ai, pair.aj, use_state_b=False) + pair.c0 = f"{v:.5f}" + pair.c1 = f"{w:.5f}" + return Pair( ai, aj, @@ -375,172 +381,326 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): c3=bind_pair.c3 if bind_pair else break_pair.c3, # type: ignore ) - hyperparameters = { - "morse_well_depth": 300, # [kJ mol-1] - "beta": 19, # matches LJ steepness - } - - # atoms - for nr in molA.atoms.keys(): - atomA = molA.atoms[nr] - atomB = molB.atoms[nr] - if atomA != atomB: - if atomA.charge != atomB.charge: + + def _interpolate_two_dihedrals( + self, + key: tuple[str, str, str, str], + dihedral_a: Optional[Dihedral], + dihedral_b: Optional[Dihedral], + funct: str, + ) -> Dihedral: + """Merge one to two Dihedrals into a Dihedral in free-energy syntax. + + Only one of the dihedrals can be None at any given time. + """ + if funct not in [FFFUNC["mult_proper_dihedral"], FFFUNC["mult_improper_dihedral"]]: + m = f"Can only interpolate between proper (type {FFFUNC['mult_proper_dihedral']}) or improper (type {FFFUNC['mult_proper_dihedral']}) dihedrals" + logger.error(m) + raise ValueError(m) + + # construct parameterized Dihedral + if dihedral_a is not None and dihedral_b is not None: + # both parameters are given + # transition between parameters + # this can be the case when both dihedrals exist + # but the atomtypes of participating atoms change from A to B state + dihedralmerge = Dihedral( + *key, + funct=funct, + c0=dihedral_a.c0, + c1=dihedral_a.c1, + periodicity=dihedral_a.periodicity, + c3=dihedral_b.c0, + c4=dihedral_b.c1, + c5=dihedral_b.periodicity, + ) + elif dihedral_a is not None: + # dihedral only in A + # vanish it by transitiononing to 0 + dihedralmerge = Dihedral( + *key, + funct=funct, + c0=dihedral_a.c0, + c1=dihedral_a.c1, + periodicity=dihedral_a.periodicity, + c3=dihedral_a.c0, + c4="0.00", + c5=dihedral_a.periodicity, + ) + elif dihedral_b is not None: + # dihedral only in B + # create it by transitioning from 0 + dihedralmerge = Dihedral( + *key, + funct=funct, + c0=dihedral_b.c0, + c1="0.00", + periodicity=dihedral_b.periodicity, + c3=dihedral_b.c0, + c4=dihedral_b.c1, + c5=dihedral_b.periodicity, + ) + else: + m = f"Tried to merge two dihedrals of {key} but no parameterized dihedrals found!" + logger.error(m) + raise ValueError(m) + return dihedralmerge + + def merge_atoms(self): + # only iterating over the keys of mol_a + # because KIMMDY can't add or remove atoms, + # the the keys should be the same in a and b + for nr in self.mol_a.atoms.keys(): + atomA = self.mol_a.atoms[nr] + atomB = self.mol_b.atoms[nr] + if not atomA.eq_by_params(atomB): + # set B parameters first + # to not overwrite them atomB.typeB = deepcopy(atomB.type) atomB.type = deepcopy(atomA.type) atomB.chargeB = deepcopy(atomB.charge) atomB.charge = deepcopy(atomA.charge) atomB.massB = deepcopy(atomB.mass) atomB.mass = deepcopy(atomA.mass) - else: - logger.debug( - f"Atom {nr} changed but not the charges!\n\tA:{atomA}\n\tB:{atomB} " - ) - - break_bind_atoms = {} - - # bonds - keysA = set(molA.bonds.keys()) - keysB = set(molB.bonds.keys()) - keys = keysA | keysB - - new_pairs_break = {} - new_pairs_bind = {} - - for key in keys: - interactionA = molA.bonds.get(key) - interactionB = molB.bonds.get(key) - - if interactionA != interactionB: - parameterizedA = get_explicit_or_type(key, interactionA, ff.bondtypes, molA) - parameterizedB = get_explicit_or_type(key, interactionB, ff.bondtypes, molB) - - # both bonds exist - if parameterizedA and parameterizedB: + if atomA.charge == atomB.charge: + logger.debug( + f"Atom {nr} changed but not the charges!\n\tA:{atomA}\n\tB:{atomB} " + ) + + def merge_bonds(self): + bond_keys_a = set(self.mol_a.bonds.keys()) + bond_keys_b = set(self.mol_b.bonds.keys()) + bond_keys = bond_keys_a | bond_keys_b + + for bond_key in bond_keys: + bond_a = self.mol_a.bonds.get(bond_key) + bond_b = self.mol_b.bonds.get(bond_key) + # the bond can either be in A, B or both + # in the first two cases the respective other will be None + + if bond_a is None and bond_b is None: + m = f"Can't find parameters for bond with key {bond_key}, got None for both bonds. This should be an impossible state." + logger.error(m) + raise ValueError(m) + + # if bond_a == bond_b: + # # bonds are equal, nothing to be done + # continue + # may still have to do somthing do to changing atomtypes + + bond_a = get_explicit_or_type( + bond_key, bond_a, self.ff.bondtypes, self.mol_a + ) + bond_b = get_explicit_or_type( + bond_key, bond_b, self.ff.bondtypes, self.mol_b, use_state_b=True + ) + if isinstance(bond_a, BondType): + bond_a = Bond(bond_a.i, bond_a.j, funct=bond_a.funct, c0=bond_a.c0, c1=bond_a.c1) + if isinstance(bond_b, BondType): + bond_b = Bond(bond_b.i, bond_b.j, funct=bond_b.funct, c0=bond_b.c0, c1=bond_b.c1) + assert isinstance(bond_a, Bond) or bond_a is None + assert isinstance(bond_b, Bond) or bond_b is None + + atomtypes_a: tuple[str, str] = tuple([self.mol_a.atoms[atom_id].type for atom_id in bond_key]) # type: ignore + atomtypes_b: tuple[str, str] = tuple([ + t if (t := getattr(self.mol_b.atoms[id], 'typeB', None)) else self.mol_b.atoms[id].type + for id in bond_key + ]) # type: ignore + + sigmaij_a, epsilonij_a = self._get_LJ_parameters(*bond_key, use_state_b=False) + sigmaij_b, epsilonij_b = self._get_LJ_parameters(*bond_key, use_state_b=True) + + depth_a, beta_a = self._get_morse_parameters(atomtypes=atomtypes_a, bond=bond_a) + depth_b, beta_b = self._get_morse_parameters(atomtypes=atomtypes_b, bond=bond_b) + + # both bonds exist-> transition between parameters + if bond_a is not None and bond_b is not None: + if not (isinstance(bond_a, (Bond, BondType)) and isinstance(bond_b, (Bond, BondType))): + m = f"Can't find parameters for bond with key {bond_key}, got explicits/types: {bond_a} and {bond_b} for a bonds: {bond_a}, {bond_b}" + logger.error(m) + raise ValueError(m) if not ( - parameterizedA.funct - == parameterizedB.funct + bond_a.funct + == bond_b.funct == FFFUNC["harmonic_bond"] ): - m = f"In slow-growth, bond functionals need to be harmonic, but {key} is not. It is in A: {parameterizedA} and B: {parameterizedB}" + m = f"In slow-growth, bond functionals need to be harmonic, but {bond_key} is not. It is in A: {bond_a} and B: {bond_b}" logger.error(m) raise ValueError(m) - molB.bonds[key] = Bond( - *key, - funct=parameterizedB.funct, - c0=parameterizedA.c0, - c1=parameterizedA.c1, - c2=parameterizedB.c0, - c3=parameterizedB.c1, + self.mol_b.bonds[bond_key] = Bond( + *bond_key, + funct=bond_b.funct, + c0=bond_a.c0, + c1=bond_a.c1, + c2=bond_b.c0, + c3=bond_b.c1, ) - - # bond only exists in A -> vanish bond - elif parameterizedA: - atomtypes = [molA.atoms[atom_id].type for atom_id in key] - break_bind_atoms[key[0]] = atomtypes[0] - break_bind_atoms[key[1]] = atomtypes[1] - sigmaij, epsilonij = get_LJ_parameters(*key) - - molB.bonds[key] = Bond( - *key, + logger.info(f"Created harmonic bond to bond transition for {bond_key} (transition): {self.mol_b.bonds[bond_key]}") + + # TODO: do we need to add this? + # if self.enable_morph_pairs: + # # Find all neighbors of the bond + # neighbor_sides = [] + # neighbor_set = set() + # for id in bond_key: + # bonds = list(filter(lambda b: id in b, bond_keys)) + # neighbor_set = set() + # for b in bonds: + # neighbor_set.add(b[0]) + # neighbor_set.add(b[1]) + # neighbor_sides.append(neighbor_set) + # + # neighbor_set.discard(bond_key[1]) # avoid double counting central bond + # + # # handle vanishing exclusions neighbors1 - key[1] + # for a1 in neighbor_sides[0]: + # pair_key = tuple(sorted((a1, bond_key[1]), key=int)) + # self.new_pairs_morph[pair_key] = self._make_pair(*pair_key, is_bind=False) + # + # # handle vanishing exclusions neighbors2 - key[0] + # for a2 in neighbor_sides[1]: + # pair_key = tuple(sorted((a2, bond_key[0]), key=int)) + # self.new_pairs_morph[pair_key] = self._make_pair(*pair_key, is_bind=False) + + elif isinstance(bond_a, (Bond, BondType)): + # bond only exists in A -> vanish bond + # transition from a "real" bond + # to one that acts like a LJ potential (=no bond) + self.mol_b.bonds[bond_key] = Bond( + *bond_key, funct=FFFUNC["morse_bond"], - c0=parameterizedA.c0, # b - c1=f"{hyperparameters['morse_well_depth']:.5f}", # D - c2=f"{hyperparameters['beta']:.5f}", # beta - c3=f"{sigmaij*1.12:.5f}", # sigmaij* 1.12 = LJ minimum - c4=f"{0.0:.5f}", # well depth -> zero - c5=f"{0.0:.5f}", + c0=bond_a.c0, # bond distance + c1=f"{depth_a:.5f}", # D + c2=f"{beta_a:.5f}", # beta + c3=f"{sigmaij_b*1.12:.5f}", # sigmaij* 1.12 = LJ minimum + c4=f"{epsilonij_b:.5f}", # well depth for LJ + c5=f"{beta_b:.5f}", # is default beta for LJ because bond_b is None ) + logger.info(f"Created Morse to LJ transition bond for {bond_key} (vanish): {self.mol_b.bonds[bond_key]}") # update bound_to - atompair = [molB.atoms[key[0]], molB.atoms[key[1]]] - atompair[0].bound_to_nrs.append(atompair[1].nr) - atompair[1].bound_to_nrs.append(atompair[0].nr) - - # Find all neighbors of the bond - neighbor_sides = [] - for idx in key: - bonds = list(filter(lambda b: idx in b, keysB)) + # TODO: this should already be done for molB, as it is the state + # for after the reaction. + # remove? + # atompair = [self.mol_b.atoms[bond_key[0]], self.mol_b.atoms[bond_key[1]]] + # atompair[0].bound_to_nrs.append(atompair[1].nr) + # atompair[1].bound_to_nrs.append(atompair[0].nr) + + if self.enable_morph_pairs: + # Find all neighbors of the bond + neighbor_sides = [] neighbor_set = set() - for b in bonds: - neighbor_set.add(b[0]) - neighbor_set.add(b[1]) - neighbor_sides.append(neighbor_set) - neighbor_set.discard(idx) # avoid double counting central bond - - # handle vanishing exclusions neighbors1 - key[1] - for a1 in neighbor_sides[0]: - pk = tuple(sorted((a1, key[1]), key=int)) - new_pairs_break[pk] = make_pair(*pk, bind=False) - - # handle vanishing exclusions neighbors2 - key[0] - for a2 in neighbor_sides[1]: - pk = tuple(sorted((a2, key[0]), key=int)) - new_pairs_break[pk] = make_pair(*pk, bind=False) - - # bond only exists in B -> create bond - elif parameterizedB: - atomtypes = [molB.atoms[atom_id].type for atom_id in key] - break_bind_atoms[key[0]] = atomtypes[0] - break_bind_atoms[key[1]] = atomtypes[1] - sigmaij, epsilonij = get_LJ_parameters(*key) - - molB.bonds[key] = Bond( - *key, + for id in bond_key: + bonds = list(filter(lambda b: id in b, bond_keys_b)) + neighbor_set = set() + for b in bonds: + neighbor_set.add(b[0]) + neighbor_set.add(b[1]) + neighbor_sides.append(neighbor_set) + + neighbor_set.discard(bond_key[1]) # avoid double counting central bond + + # bond is vanishing, which removes exclusions neighbors1 - key[1] + for a1 in neighbor_sides[0]: + pair_key = tuple(sorted((a1, bond_key[1]), key=int)) + self.affected_pairs[pair_key] = self._make_pair(*pair_key, transition=PairTransition.Create) + + # handle vanishing exclusions neighbors2 - key[0] + for a2 in neighbor_sides[1]: + pair_key = tuple(sorted((a2, bond_key[0]), key=int)) + self.affected_pairs[pair_key] = self._make_pair(*pair_key, transition=PairTransition.Create) + + elif isinstance(bond_b, (Bond, BondType)): + # bond only exists in B -> create bond + # transition from a "morse bond" that acts like a LJ potential + # to a "real" morse bond acting like the bond + if epsilonij_a == 0: + raise ValueError(f"epsilonij_a is 0 for {bond_key}") + self.mol_b.bonds[bond_key] = Bond( + *bond_key, funct=FFFUNC["morse_bond"], - c0=f"{sigmaij*1.12:.5f}", # sigmaij* 1.12 = LJ minimum - c1=f"{0.0:.5f}", # well depth is epsilonij - c2=f"{0.0:.5f}", - c3=parameterizedB.c0, # b - c4=f"{hyperparameters['morse_well_depth']:.5f}", # D - c5=f"{hyperparameters['beta']:.5f}", # beta + c0=f"{sigmaij_a*1.12:.5f}", # sigmaij* 1.12 = LJ minimum at lambda 0, where the atom types are still from A + c1=f"{epsilonij_a:.5f}", # well depth is epsilonij + c2=f"{self.default_beta_for_lj:.5f}", # since the bond didn't exist in A, beta_a is the default beta for LJ + c3=bond_b.c0, # bond distance + c4=f"{depth_b:.5f}", # D + c5=f"{beta_b:.5f}", # beta ) + logger.info(f"Created LJ to Morse transition bond for {bond_key} (create): {self.mol_b.bonds[bond_key]}") + + if self.enable_morph_pairs: + # Find all neighbors of the bond + neighbor_sides = [] + for id in bond_key: + bonds = list(filter(lambda b: id in b, bond_keys_a)) + neighbor_set = set() + for b in bonds: + neighbor_set.add(b[0]) + neighbor_set.add(b[1]) + neighbor_sides.append(neighbor_set) + neighbor_set.discard(id) # type: ignore # avoid double counting central bond + + # handle growing exclusions neighbors1 - key[1] + for a1 in neighbor_sides[0]: + pair_key = tuple(sorted((a1, bond_key[1]), key=int)) + self.new_pairs_bind[pair_key] = self._make_pair(*pair_key, transition=PairTransition.Vanish) + + # handle growing exclusions neighbors2 - key[0] + for a2 in neighbor_sides[1]: + pair_key = tuple(sorted((a2, bond_key[0]), key=int)) + self.new_pairs_bind[pair_key] = self._make_pair(*pair_key, transition=PairTransition.Vanish) + + + def merge_pairs(self): + """ + If it is for a binding event, the pair is vanishing, as it will be an + exclusion once bound. If a bond is breaking, the pair interaction + is slowly turned on, as it was excluded previously. + """ + if not self.enable_morph_pairs: + return + for key, pair in self.affected_pairs.items(): + # if break_pair is None and bind_pair is None: + # raise ValueError(f"Both break_pair and bind_pair are None for key {key}. This should be an impossible state.") + + # morph_pair = self._interpolate_two_pairs(break_pair, bind_pair) + + self.mol_b.pairs[key] = pair + # growing/shrinking pairs are used to turn on/off + # LJ / non-bonded interactions + + # Add general exclusions for each pair + # such that automatic non-bonded interactions + # don't interfer with our efforts + # gromacs does not automatically create exclusions for our our new morse bonds (type 3). + # see https://manual.gromacs.org/current/reference-manual/topologies/molecule-definition.html#excl + # "Particles are considered bonded when they are connected by “chemical” bonds ([ bonds ] types 1 to 5, 7 or 8)" + self.mol_b.exclusions[key] = Exclusion(*key) + + def merge_angles(self): + keys = set(self.mol_a.angles.keys()) | set(self.mol_b.angles.keys()) + for key in keys: + angle_a = self.mol_a.angles.get(key) + angle_b = self.mol_b.angles.get(key) + + if angle_a == angle_b: + # angles are the same, nothing to be done. + continue - # Find all neighbors of the bond - neighbor_sides = [] - for idx in key: - bonds = list(filter(lambda b: idx in b, keysA)) - neighbor_set = set() - for b in bonds: - neighbor_set.add(b[0]) - neighbor_set.add(b[1]) - neighbor_sides.append(neighbor_set) - neighbor_set.discard(idx) # type: ignore # avoid double counting central bond - - # handle growing exclusions neighbors1 - key[1] - for a1 in neighbor_sides[0]: - pk = tuple(sorted((a1, key[1]), key=int)) - new_pairs_bind[pk] = make_pair(*pk, bind=True) - - # handle growing exclusions neighbors2 - key[0] - for a2 in neighbor_sides[1]: - pk = tuple(sorted((a2, key[0]), key=int)) - new_pairs_bind[pk] = make_pair(*pk, bind=True) - - for key in set(new_pairs_bind.keys()) | set(new_pairs_break.keys()): - break_pair = new_pairs_break.get(key) - bind_pair = new_pairs_bind.get(key) - - morph_pair = merge_pairs(break_pair, bind_pair) - - molB.pairs[key] = morph_pair - molB.exclusions[key] = Exclusion(*key) - - # angles - keys = set(molA.angles.keys()) | set(molB.angles.keys()) - for key in keys: - interactionA = molA.angles.get(key) - interactionB = molB.angles.get(key) - - if interactionA != interactionB: parameterizedA = get_explicit_or_type( - key, interactionA, ff.angletypes, molA + key, angle_a, self.ff.angletypes, self.mol_a ) parameterizedB = get_explicit_or_type( - key, interactionB, ff.angletypes, molB + key, angle_b, self.ff.angletypes, self.mol_b ) - if parameterizedA and parameterizedB: - molB.angles[key] = Angle( + + if parameterizedA is not None and parameterizedB is not None: + if not (isinstance(parameterizedA, (Angle, AngleType)) and isinstance(parameterizedB, (Angle, AngleType))): + m = f"Can't find parameters for bond with key {key}, got explicits/types: {parameterizedA} and {parameterizedB} for a bonds: {angle_a}, {angle_b}" + logger.error(m) + raise ValueError(m) + # both angles exist -> transition between parameters + self.mol_b.angles[key] = Angle( *key, funct=parameterizedB.funct, c0=parameterizedA.c0, @@ -548,8 +708,9 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): c2=parameterizedB.c0, c3=parameterizedB.c1, ) - elif parameterizedA: - molB.angles[key] = Angle( + elif isinstance(parameterizedA, (Angle, AngleType)): + # angle only exists in A -> vanish angle + self.mol_b.angles[key] = Angle( *key, funct=FFFUNC["harmonic_angle"], c0=parameterizedA.c0, @@ -557,8 +718,9 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): c2=parameterizedA.c0, c3="0.00", ) - elif parameterizedB: - molB.angles[key] = Angle( + elif isinstance(parameterizedB, (Angle, AngleType)): + # angle only exists in B -> create angle + self.mol_b.angles[key] = Angle( *key, funct=FFFUNC["harmonic_angle"], c0=parameterizedB.c0, @@ -569,23 +731,28 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): else: logger.warning(f"Could not parameterize angle {key}.") - # dihedrals - # proper dihedrals - # proper dihedrals have a nested structure and need a different treatment from bonds, angles and improper dihedrals - # if indices change atomtypes and parameters change because of that, it will ignore these parameter change + def merge_dihedrals(self): - keys = set(molA.proper_dihedrals.keys()) | set(molB.proper_dihedrals.keys()) - for key in keys: - multiple_dihedralsA = molA.proper_dihedrals.get(key) - multiple_dihedralsB = molB.proper_dihedrals.get(key) + # TODO: dihedrals between atoms whose types change need to be handled + # explitly - if multiple_dihedralsA != multiple_dihedralsB: - multiple_dihedralsA = get_explicit_MultipleDihedrals( - key, molA, multiple_dihedralsA, ff + # proper dihedrals + # proper dihedrals have a nested structure and need a different treatment from bonds, angles and improper dihedrals + keys = set(self.mol_a.proper_dihedrals.keys()) | set( + self.mol_b.proper_dihedrals.keys() + ) + for key in keys: + multiple_dihedralsA = self._get_explicit_MultipleDihedrals( + key=key, use_state_b=False, use_improper=False ) - multiple_dihedralsB = get_explicit_MultipleDihedrals( - key, molB, multiple_dihedralsB, ff + multiple_dihedralsB = self._get_explicit_MultipleDihedrals( + key=key, use_state_b=True, use_improper=False ) + + if multiple_dihedralsA == multiple_dihedralsB: + # dihedrals are the same, nothing to be done + continue + keysA = ( set(multiple_dihedralsA.dihedrals.keys()) if multiple_dihedralsA @@ -597,12 +764,11 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): else set() ) - molB.proper_dihedrals[key] = MultipleDihedrals( - *key, FFFUNC["mult_proper_dihedral"], {} + self.mol_b.proper_dihedrals[key] = MultipleDihedrals( + *key, funct=FFFUNC["mult_proper_dihedral"], dihedrals={} ) periodicities = keysA | keysB for periodicity in periodicities: - assert isinstance(periodicity, str) interactionA = ( multiple_dihedralsA.dihedrals.get(periodicity) if multiple_dihedralsA @@ -614,30 +780,25 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): else None ) - molB.proper_dihedrals[key].dihedrals[periodicity] = merge_dihedrals( - key, - interactionA, - interactionB, - ff.proper_dihedraltypes, - ff.proper_dihedraltypes, - molA, - molB, - FFFUNC["mult_proper_dihedral"], - periodicity, + self.mol_b.proper_dihedrals[key].dihedrals[periodicity] = ( + self._interpolate_two_dihedrals( + key=key, + dihedral_a=interactionA, + dihedral_b=interactionB, + funct=FFFUNC["mult_proper_dihedral"], + ) ) - # improper dihedrals - keys = set(molA.improper_dihedrals.keys()) | set(molB.improper_dihedrals.keys()) - for key in keys: - multiple_dihedralsA = molA.improper_dihedrals.get(key) - multiple_dihedralsB = molB.improper_dihedrals.get(key) - - if multiple_dihedralsA != multiple_dihedralsB: - multiple_dihedralsA = get_explicit_MultipleDihedrals( - key, molA, multiple_dihedralsA, ff + # improper dihedrals + keys = set(self.mol_a.improper_dihedrals.keys()) | set( + self.mol_b.improper_dihedrals.keys() + ) + for key in keys: + multiple_dihedralsA = self._get_explicit_MultipleDihedrals( + key=key, use_state_b=False, use_improper=True ) - multiple_dihedralsB = get_explicit_MultipleDihedrals( - key, molB, multiple_dihedralsB, ff + multiple_dihedralsB = self._get_explicit_MultipleDihedrals( + key=key, use_state_b=True, use_improper=True ) keysA = ( set(multiple_dihedralsA.dihedrals.keys()) @@ -650,8 +811,8 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): else set() ) - molB.improper_dihedrals[key] = MultipleDihedrals( - *key, FFFUNC["mult_improper_dihedral"], {} + self.mol_b.improper_dihedrals[key] = MultipleDihedrals( + *key, funct=FFFUNC["mult_improper_dihedral"], dihedrals={} ) periodicities = keysA | keysB for periodicity in periodicities: @@ -667,47 +828,31 @@ def merge_pairs(break_pair: Optional[Pair], bind_pair: Optional[Pair]): else None ) - molB.improper_dihedrals[key].dihedrals[periodicity] = merge_dihedrals( - key, - interactionA, - interactionB, - ff.improper_dihedraltypes, - ff.improper_dihedraltypes, - molA, - molB, - FFFUNC["mult_improper_dihedral"], - periodicity, + self.mol_b.improper_dihedrals[key].dihedrals[periodicity] = ( + self._interpolate_two_dihedrals( + key=key, + dihedral_a=interactionA, + dihedral_b=interactionB, + funct=FFFUNC["mult_improper_dihedral"], + ) ) - # amber fix for breaking/binding atom types without LJ potential - for k, v in break_bind_atoms.items(): - if v in ["HW", "HO"]: - molB.atoms[k].type = "H1" - if molB.atoms[k].typeB is not None: - molB.atoms[k].typeB = "H1" - - # update is_radical attribute of Atom objects in topology - molB.find_radicals() - - return molB - def merge_top_slow_growth( - topA: Topology, topB: Topology, morph_pairs: bool + top_a: Topology, top_b: Topology, enable_morph_pairs: bool ) -> Topology: """Takes two Topologies and joins them for a smooth free-energy like parameter transition simulation. - For now this assumes that only one moleculeype is of interest. + Modifies topB in place. + All changes are contained to the `Reactive` moleculetype. """ - molA = topA.moleculetypes[REACTIVE_MOLECULEYPE] - molB = topB.moleculetypes[REACTIVE_MOLECULEYPE] - molB = merge_top_moleculetypes_slow_growth( - molA=molA, molB=molB, ff=topB.ff, morph_pairs=morph_pairs - ) - # not necessary, will be updated automatically on `to_dict()` - # topB._update_dict() - - return topB + MoleculeTypeMerger( + mol_a=top_a.moleculetypes[REACTIVE_MOLECULEYPE], + mol_b=top_b.moleculetypes[REACTIVE_MOLECULEYPE], + ff=top_b.ff, + enable_morph_pairs=enable_morph_pairs, + ).merge() + return top_b def break_bond_plumed( diff --git a/src/kimmdy/parsing.py b/src/kimmdy/parsing.py index 1e9d7b61..cfe49d43 100644 --- a/src/kimmdy/parsing.py +++ b/src/kimmdy/parsing.py @@ -575,7 +575,7 @@ def read_csv_to_list(csv_file: Path) -> list: ## Miscellaneous files -def read_edissoc(path: Path) -> dict: +def read_edissoc(path: Path) -> dict[str, dict[tuple[str, str], float]]: """Reads a edissoc file and turns it into a dict. The dissociation energy is assigned per pair of atom names. Atom names are unique to a residue, and the dict is nested by residues. @@ -602,7 +602,8 @@ def read_edissoc(path: Path) -> dict: edissocs[key] = {} elif len(l.split(sep=";")[0].split()) == 3: at1, at2, edissoc, *_ = l.split() - edissocs[key][frozenset([at1, at2])] = float(edissoc) + interaction_key = tuple(sorted([at1, at2])) + edissocs[key][interaction_key] = float(edissoc) elif len(l.strip()) > 0: logger.warning(f"Unexpected line in edissoc file: {l}") return edissocs diff --git a/src/kimmdy/recipe.py b/src/kimmdy/recipe.py index 0c3e33fd..6813323d 100644 --- a/src/kimmdy/recipe.py +++ b/src/kimmdy/recipe.py @@ -463,8 +463,15 @@ def f(top: Topology) -> Topology: @dataclass class DeferredRecipeSteps(Generic[T]): + """DeferredRecipeSteps + + Are called by the kimmdy runmanager if the recipe is chosen. + The callback is given the chosen key (generic over type T) as well + as the index into the rates (and time ranges for said rates) and + the time in ps at which the reaction occurs. + """ key: T - callback: Callable[[T, int], list[RecipeStep]] + callback: Callable[[T, int, float], list[RecipeStep]] def __eq__(self, other) -> bool: if not isinstance(other, DeferredRecipeSteps): @@ -527,7 +534,6 @@ def get_vmd_selection(self) -> str: return "" ixs = set() for rs in self.recipe_steps: - print(self.recipe_steps) if isinstance(rs, BondOperation): ixs.add(rs.atom_ix_1) ixs.add(rs.atom_ix_2) @@ -596,17 +602,14 @@ def get_recipe_name(self): name += "\u27A1" # ➡ if (ix := getattr(rs, "atom_ix_2", None)) is not None: name += str(ix) - elif isinstance(rs, Break): if (ix := getattr(rs, "atom_ix_1", None)) is not None: name += str(ix) name += "\u26A1" # ➡ if (ix := getattr(rs, "atom_ix_2", None)) is not None: name += str(ix) - elif isinstance(rs, Relax): pass - elif isinstance(rs, CustomTopMod): name += "f" diff --git a/src/kimmdy/runmanager.py b/src/kimmdy/runmanager.py index 5ee1fdbc..d0c13d8d 100644 --- a/src/kimmdy/runmanager.py +++ b/src/kimmdy/runmanager.py @@ -209,6 +209,7 @@ def __init__(self, config: Config): radicals=getattr(self.config, "radicals", None), residuetypes_path=getattr(self.config, "residuetypes", None), reactive_nrexcl=nrexcl, + gromacs_alias=self.config.gromacs_alias ) self.filehist: list[dict[str, TaskFiles]] = [ {"setup": TaskFiles(self.get_latest)} @@ -433,9 +434,9 @@ def _setup(self, files: TaskFiles) -> TaskFiles: logger.info("Writing initial topology after parsing") if self.config.parameterize_at_setup: - focus_nrs = set(self.top.atoms.keys()) + self.top.parameterization_focus_ids= set(self.top.atoms.keys()) self.top.needs_parameterization = True - self.top.update_parameters(focus_nrs) + self.top.update_parameters() write_top(self.top.to_dict(), files.outputdir / self.config.top.name) files.output["top"] = files.outputdir / self.config.top.name @@ -592,6 +593,7 @@ def _restart_from_rundir(self): ), radicals=getattr(self.config, "radicals", None), residuetypes_path=getattr(self.config, "residuetypes", None), + gromacs_alias=self.config.gromacs_alias ) def _restart_task(self, _: TaskFiles) -> None: @@ -616,6 +618,7 @@ def _run_md( files.input["mdp"] = md_config.mdp mdp = files.input["mdp"] ndx = files.input["ndx"] + logger.info(f"Using the following input files: top: {top}, gro: {gro}") # to continue MD after timeout if continue_md: @@ -671,6 +674,13 @@ def _run_md( except CalledProcessError as e: write_time_marker(files.outputdir / MARK_FAILED, "failed") logger.error(f"Error occured during MD {instance}:\n{e}") + + # TODO: debug + # attempt to write out the first frame as gro + # for easier debugging in VMD + dump_cmd = f"echo -e '0\\n' | {gmx_alias} trjconv -s {instance}.tpr -f {instance}.xtc -dump 0 -o {instance}_start.gro" + run_gmx(dump_cmd, outputdir) + raise e logger.info(f"Done with MD {instance}") @@ -786,10 +796,10 @@ def _decide_recipe( return files # Correct the offset of time_start_index by the concatenation - # of all recipes from all reactions back onto the offset + # of all rates of all recipes from all reactions back onto the offset # within the one chosen reaction plugin n_recipes_per_plugin = [ - len(v.recipes) for v in self.recipe_collections.values() + sum([len(r.rates) for r in v.recipes]) for v in self.recipe_collections.values() ] logger.info(f"Plugins: {[k for k in self.recipe_collections.keys()]}") logger.info(f"Number of recipes per reaction plugin: {n_recipes_per_plugin}") @@ -865,10 +875,10 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: f"Steps of recipe where deferred, calling callback with key {recipe.recipe_steps.key} and time_index {plugin_time_index}" ) recipe.recipe_steps = recipe.recipe_steps.callback( - recipe.recipe_steps.key, plugin_time_index + recipe.recipe_steps.key, plugin_time_index, ttime ) logger.info( - f"Got {len(recipe.recipe_steps)} in recipe {recipe.get_recipe_name()}" + f"Got {len(recipe.recipe_steps)} steps in recipe {recipe.get_recipe_name()}" ) else: m = f"Recipe steps of {recipe} are neither a list nor a DeferredRecipeSteps object." @@ -891,11 +901,9 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: truncate_sim_files(files=files, time=ttime) top_initial = deepcopy(self.top) - focus_nrs: set[str] = set() for step in recipe.recipe_steps: if isinstance(step, Break): self.top.break_bond((step.atom_id_1, step.atom_id_2)) - focus_nrs.update([step.atom_id_1, step.atom_id_2]) if hasattr(self.config, "plumed"): break_bond_plumed( files, @@ -904,7 +912,6 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: ) elif isinstance(step, Bind): self.top.bind_bond((step.atom_id_1, step.atom_id_2)) - focus_nrs.update([step.atom_id_1, step.atom_id_2]) elif isinstance(step, Place): task = Task( self, @@ -915,9 +922,8 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: place_files = task() if place_files is not None: self._discover_output_files(task.name, place_files) - if step.id_to_place is not None: - focus_nrs.update([step.id_to_place]) - + elif isinstance(step, CustomTopMod): + step.f(self.top) elif isinstance(step, Relax): logger.info("Starting relaxation md as part of reaction..") if not hasattr(self.config.changer.coordinates, "md"): @@ -925,8 +931,13 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: continue if self.config.changer.coordinates.slow_growth: - # Create a temporary slow growth topology for sub-task run_md, afterwards, top will be reset properly - self.top.update_parameters(focus_nrs) + # Create a temporary slow growth topology for sub-task run_md, afterwards, top will be reset properly. + + self.top.update_parameters() + + # write out original topology (topA) and target (topB) for easier debugging + write_top(top_initial.to_dict(), files.outputdir / self.config.top.name.replace(".top", "_before.top")) + write_top(self.top.to_dict(), files.outputdir / self.config.top.name.replace(".top", "_after.top")) # top_initial is still the topology before the reaction # we need to do some (temporary) changes to it to stabilize @@ -934,7 +945,7 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: # First we find out if there are solvent atoms among the involved atoms solvent_atoms: set[str] = set() logger.debug(f"Checking for reacting solvent residues..") - for ai in focus_nrs: + for ai in self.top.parameterization_focus_ids: if top_initial.atoms[ai].residue == "SOL": solvent_atoms.add(ai) logger.debug( @@ -967,37 +978,42 @@ def _apply_recipe(self, files: TaskFiles) -> TaskFiles: b2 = top_initial.bonds.get((ow.nr, hw2.nr)) logger.info(f"Added bonds: {b1}, {b2}") + write_top(top_initial.to_dict(), files.outputdir / self.config.top.name.replace(".top", "_before_with_solvent_bonds.top")) + top_merge = merge_top_slow_growth( - # topA was copied before parameters are updated - topA=top_initial, - # topB is parameterized for after the reaction - topB=deepcopy(self.top), - morph_pairs=self.config.changer.coordinates.slow_growth_pairs, + # top_a was copied before parameters are updated + top_a=top_initial, + # top_b is parameterized for after the reaction + # top_b is modified and returned as the merged top + # hence it must be copied here to not modify self.top + top_b=deepcopy(self.top), + enable_morph_pairs=self.config.changer.coordinates.slow_growth_pairs, ) top_merge_path = files.outputdir / self.config.top.name.replace( - ".", "_mod." + ".top", "_relax.top" ) write_top(top_merge.to_dict(), top_merge_path) + # declare this temporary topolgy the lates topology + # such that it will be used for the subsequent relax md task self.latest_files["top"] = top_merge_path - instance = self.config.changer.coordinates.md + md_instance = self.config.changer.coordinates.md task = Task( self, f=self._run_md, - kwargs={"instance": instance}, - out=instance, + kwargs={"instance": md_instance}, + out=md_instance, ) md_files = task() if md_files is not None: self._discover_output_files(task.name, md_files) - elif isinstance(step, CustomTopMod): - step.f(self.top) + logger.info(f"Updating partial charges") self.top.update_partial_charges(recipe.recipe_steps) - self.top.update_parameters(focus_nrs) + self.top.update_parameters() # this is the new topology after the reaction - # not the tempoary topology top_mod for slow_growth + # not the temporay topology top_relax for slow_growth write_top(self.top.to_dict(), files.outputdir / self.config.top.name) files.output["top"] = files.outputdir / self.config.top.name diff --git a/src/kimmdy/topology/atomic.py b/src/kimmdy/topology/atomic.py index b02002dd..2a47d4f0 100644 --- a/src/kimmdy/topology/atomic.py +++ b/src/kimmdy/topology/atomic.py @@ -4,12 +4,16 @@ See [gromacs manual](https://manual.gromacs.org/current/reference-manual/topologies/topology-file-formats.html#topology-file) """ +import logging from dataclasses import dataclass, field from typing import Optional, Union from kimmdy.constants import FFFUNC from kimmdy.utils import field_or_none +logger = logging.getLogger(__name__) + + @dataclass class MoleculeTypeHeader: @@ -58,6 +62,22 @@ def from_top_line(cls, l: list[str]): massB=field_or_none(l, 10), ) + def eq_by_params(self, other): + """Compare two atoms by thair parameters, ignoring bound_to_nrs and is_radical.""" + if not isinstance(other, Atom): + logger.warning( + f"Comparing Atoms with different types: {type(self)} and {type(other)}. Returning False." + ) + return False + + for field in ["nr", "type", "resnr", "residue", "atom", "cgnr", "charge", "mass", "typeB", "chargeB", "massB"]: + if getattr(self, field) != getattr(other, field): + return False + + return True + + + @dataclass() class PositionRestraint: diff --git a/src/kimmdy/topology/ff.py b/src/kimmdy/topology/ff.py index 1ec18b8a..cb317d8f 100644 --- a/src/kimmdy/topology/ff.py +++ b/src/kimmdy/topology/ff.py @@ -6,7 +6,7 @@ from typing import Optional from kimmdy.constants import FFFUNC -from kimmdy.parsing import read_top +from kimmdy.parsing import read_top, read_edissoc from kimmdy.topology.atomic import ( AngleId, AngleType, @@ -21,6 +21,7 @@ ResidueType, ) from kimmdy.topology.utils import get_top_section +from kimmdy.utils import get_gmx_dir logger = logging.getLogger(__name__) @@ -28,7 +29,7 @@ class FF: """Container for parsed forcefield data.""" - def __init__(self, top: dict, residuetypes_path: Optional[Path] = None): + def __init__(self, top: dict, residuetypes_path: Optional[Path] = None, gromacs_alias: str = 'gmx'): self.atomtypes: dict[AtomId, AtomType] = {} self.bondtypes: dict[BondId, BondType] = {} self.angletypes: dict[AngleId, AngleType] = {} @@ -36,9 +37,17 @@ def __init__(self, top: dict, residuetypes_path: Optional[Path] = None): self.improper_dihedraltypes: dict[ImproperDihedralId, DihedralType] = {} self.residuetypes: dict[str, ResidueType] = {} self.nonbond_params: dict[BondId, NonbondParamType] = {} + self.default_edissoc: dict[str, dict[tuple[str, str], float]] = {} ffdir: Optional[Path] = top["ffdir"] + gmxdir = get_gmx_dir(gromacs_alias) + if gmxdir is not None: + gmx_builtin_ffs = gmxdir / "top" + self.default_edissoc = read_edissoc(gmx_builtin_ffs / "edissoc.dat") + else: + logger.warning(f"Could not find gromacs data directory for {gromacs_alias}") + if defaults := get_top_section(top, "defaults"): self.defaults = defaults diff --git a/src/kimmdy/topology/topology.py b/src/kimmdy/topology/topology.py index eccb9ca7..77f6e870 100644 --- a/src/kimmdy/topology/topology.py +++ b/src/kimmdy/topology/topology.py @@ -673,6 +673,10 @@ class Topology: Path to the residue types file. reactive_nrexcl Overwrite nrexcl value for the reactive moleculetype. Otherwise takes the nrexcl of the first reactive moleculetype. + needs_parameterization + Does the topology currently need to be re-parameterized due to changes? + parameterization_focus_ids + list of atoms ids around which the parameterization happens (to avoid re-parameterizing the whole Reactive moleculetype) """ def __init__( @@ -683,6 +687,7 @@ def __init__( radicals: Optional[str] = None, residuetypes_path: Optional[Path] = None, reactive_nrexcl: Optional[str] = None, + gromacs_alias: str = "gmx", ) -> None: if top == {}: raise NotImplementedError( @@ -701,7 +706,8 @@ def __init__( self.parametrizer = parametrizer self._check_is_reactive_molecule = is_reactive_predicate_f - self.needs_parameterization = False + self.needs_parameterization: bool = False + self.parameterization_focus_ids: set[str] = set() self._merge_moleculetypes(radicals=radicals, nrexcl=reactive_nrexcl) self._link_atomics() @@ -964,13 +970,14 @@ def _update_dict(self): [attributes_to_list(x) for x in self.ff.angletypes.values()], ) - def update_parameters(self, focus_nrs: Optional[set[str]] = None): + def update_parameters(self): if self.needs_parameterization: if self.parametrizer is not None: + focus_ids = self.parameterization_focus_ids logger.info( - f"Starting parametrization using {self.parametrizer.__class__.__name__}" + f"Starting parametrization using {self.parametrizer.__class__.__name__} with focus on {focus_ids}" ) - self.parametrizer.parameterize_topology(self, focus_nrs) + self.parametrizer.parameterize_topology(self, focus_nrs=focus_ids) else: raise RuntimeError("No Parametrizer was initialized in this topology!") self.needs_parameterization = False @@ -1131,6 +1138,7 @@ def del_atom( self.update_parameters() # Overwriting in case of no parameterization wanted self.needs_parameterization = False + self.parameterization_focus_ids = set() return update_map @@ -1222,6 +1230,7 @@ def break_bond( ) self.needs_parameterization = True + self.parameterization_focus_ids.update(atompair_nrs) # mark atoms as radicals for atom in atompair: @@ -1315,6 +1324,7 @@ def bind_bond( ) self.needs_parameterization = True + self.parameterization_focus_ids.update(atompair_nrs) # de-radialize if re-combining two radicals if all(map(lambda x: x.is_radical, atompair)): @@ -1468,3 +1478,5 @@ def bind_bond( if settles is not None: logger.info(f"Removing settles {ai}") reactive_moleculetype.settles.pop(ai) + + diff --git a/src/kimmdy/utils.py b/src/kimmdy/utils.py index 55ba46bb..44012f2d 100644 --- a/src/kimmdy/utils.py +++ b/src/kimmdy/utils.py @@ -457,24 +457,14 @@ def truncate_sim_files( f"Truncating trajectories to {time:.4} ps. Trajectory time was {last_time:.4} ps" ) - # backup the tails of trajectories - for trj in trjs: - tmp = trj.rename(trj.with_name("tmp_backup_" + trj.name)) - if keep_tail: - run_gmx( - f"gmx trjconv -f {tmp} -b {time} -o {trj}", - ) - trj.rename(str(trj) + ".tail") - - run_gmx(f"gmx trjconv -f {tmp} -e {time} -o {trj}") - tmp.unlink() # backup the gro bck_gro = paths["gro"].rename( paths["gro"].with_name("tmp_backup_" + paths["gro"].name) ) + # write out gro at the specified time sp.run( - f"gmx trjconv -f {trjs[0]} -s {bck_gro} -dump -1 -o {paths['gro']}", + f"gmx trjconv -f {trjs[0]} -s {bck_gro} -dump {time} -o {paths['gro']}", text=True, input="0", shell=True, @@ -483,6 +473,19 @@ def truncate_sim_files( if not keep_tail: bck_gro.unlink() + # backup the tails of trajectories + for trj in trjs: + tmp = trj.rename(trj.with_name("tmp_backup_" + trj.name)) + if keep_tail: + run_gmx( + f"gmx trjconv -f {tmp} -b {time} -o {trj}", + ) + trj.rename(str(trj) + ".tail") + + run_gmx(f"gmx trjconv -f {tmp} -e {time} -o {trj}") + tmp.unlink() + + # backup the edr if paths["edr"] is not None: bck_edr = paths["edr"].rename( diff --git a/tests/test_coordinates.py b/tests/test_coordinates.py index 885743da..7e9a2e42 100644 --- a/tests/test_coordinates.py +++ b/tests/test_coordinates.py @@ -1,3 +1,4 @@ +import logging import pytest from pathlib import Path import re @@ -121,25 +122,15 @@ def test_plumed_break(arranged_tmp_path): ) -def test_merge_prm_top(arranged_tmp_path): +def test_merge_prm_top(arranged_tmp_path, caplog): """this tests a topology merge for a HAT reaction from a Ca (nr 19) radical to a N (nr 26) radical""" - top_A = Topology(read_top(Path("topol_stateA.top"))) - top_B = Topology(read_top(Path("topol_stateB.top"))) + top_a = Topology(read_top(Path("topol_stateA.top"))) + top_b = Topology(read_top(Path("topol_stateB.top"))) top_merge_ref = Topology(read_top(Path("topol_FEP.top"))) - top_merge = merge_top_slow_growth(top_A, top_B, morph_pairs=True) - - assert top_merge.atoms == top_merge_ref.atoms - assert top_merge.bonds.keys() == top_merge_ref.bonds.keys() - assert top_merge.angles.keys() == top_merge_ref.angles.keys() - assert top_merge.pairs.keys() == top_merge_ref.pairs.keys() - assert top_merge.exclusions.keys() == top_merge_ref.exclusions.keys() - assert top_merge.proper_dihedrals.keys() == top_merge_ref.proper_dihedrals.keys() - assert ( - top_merge.improper_dihedrals.keys() == top_merge_ref.improper_dihedrals.keys() - ) - assert len(top_merge.exclusions) == 7 + caplog.set_level(logging.INFO) + top_merge = merge_top_slow_growth(top_a=top_a, top_b=top_b, enable_morph_pairs=True) assert top_merge.bonds[("19", "27")].funct == "3" assert top_merge.bonds[("26", "27")].funct == "3" @@ -148,7 +139,25 @@ def test_merge_prm_top(arranged_tmp_path): assert ( top_merge.improper_dihedrals[("17", "20", "19", "24")].dihedrals["2"].c5 == "2" ) - # assert one dihedral merge improper/proper + + assert top_merge.proper_dihedrals == top_merge_ref.proper_dihedrals + assert top_merge.improper_dihedrals == top_merge_ref.improper_dihedrals + + # assert top_merge.atoms == top_merge_ref.atoms + # more detailed output: + for k,ref in top_merge_ref.atoms.items(): + new = top_merge.atoms[k] + assert ref == new + + # assert top_merge.bonds == top_merge_ref.bonds + # more detailed output: + for k,ref in top_merge_ref.bonds.items(): + new = top_merge.bonds[k] + assert ref == new + + assert top_merge.angles == top_merge_ref.angles + assert top_merge.pairs == top_merge_ref.pairs + assert top_merge.exclusions == top_merge_ref.exclusions @pytest.mark.require_gmx diff --git a/tests/test_files/test_coordinates/topol_FEP.top b/tests/test_files/test_coordinates/topol_FEP.top index 6437dc11..6541e2a1 100644 --- a/tests/test_files/test_coordinates/topol_FEP.top +++ b/tests/test_files/test_coordinates/topol_FEP.top @@ -847,7 +847,7 @@ Reactive 3 24 C 3 ALA C 24 0.5973 12.01 25 O 3 ALA O 25 -0.5679 16 26 N 4 ALA N 26 -0.4157 14.01 N -0.600 14.01 -27 H1 4 ALA HA 27 0.2719 1.008 +27 H 4 ALA HA 27 0.2719 1.008 H1 0.2719 1.008 28 CT 4 ALA CA 28 0.0337 12.01 CT 0.2180 12.01 29 H1 4 ALA HA 29 0.0823 1.008 30 CT 4 ALA CB 30 -0.1825 12.01 diff --git a/tests/test_files/test_topology/topol_stateA.top b/tests/test_files/test_topology/topol_stateA.top index 9fcaf2a2..39b246da 100644 --- a/tests/test_files/test_topology/topol_stateA.top +++ b/tests/test_files/test_topology/topol_stateA.top @@ -94,9 +94,9 @@ Protein 3 15 16 1 15 17 1 17 18 1 -17 19 1 0.13600 282001.600000 ; patched parameter -19 20 1 0.14955 259408.000000 ; patched parameter -19 24 1 0.14916 265265.600000 ; patched parameter +17 19 1 0.13600 282001.600000 +19 20 1 0.14955 259408.000000 +19 24 1 0.14916 265265.600000 20 21 1 20 22 1 20 23 1 diff --git a/tests/test_files/test_topology/topol_stateB.top b/tests/test_files/test_topology/topol_stateB.top index 9b5b8312..2ad34331 100644 --- a/tests/test_files/test_topology/topol_stateB.top +++ b/tests/test_files/test_topology/topol_stateB.top @@ -97,13 +97,15 @@ Protein 3 17 19 1 0.14490 282001.6 19 20 1 0.15260 259408.0 19 24 1 0.15220 265265.6 -19 27 3 0.10900 400.0 19.0 +; this should never be in topB, we create +; those automatically only during merging +; 19 27 3 0.10900 400.0 19.0 20 21 1 20 22 1 20 23 1 24 25 1 -24 26 1 0.13883 410032.000000 ; patched parameter -26 28 1 0.14200 282001.600000 ; patched parameter +24 26 1 0.13883 410032.000000 +26 28 1 0.14200 282001.600000 28 29 1 28 30 1 28 34 1 diff --git a/tests/test_integration.py b/tests/test_integration.py index bac59f11..94013b00 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -87,7 +87,8 @@ def test_grappa_partial_parameterization(arranged_tmp_path): top_del.update_parameters() top_del.break_bond(("1", "5")) top_del.needs_parameterization = True - top_del.update_parameters(focus_nrs=set(["1", "5"])) + top_del.parameterization_focus_ids = set(["1", "5"]) + top_del.update_parameters() # check values are changed around homolysis atoms assert top.bonds[("5", "6")] != top_del.bonds[("5", "6")] diff --git a/tests/test_kmc.py b/tests/test_kmc.py index f366fd62..2e8fe6a7 100644 --- a/tests/test_kmc.py +++ b/tests/test_kmc.py @@ -207,7 +207,7 @@ def test_compare_extrande_extrande_mod(recipe_collection): assert abs(ext_rs.mean() - extmod_rs.mean()) < 0.002 -def test_total_index_to_index_within_plugin(): +def test_total_index_to_index_within_plugin_for_multiple_plugins(): ns = [3, 2, 4, 1] assert total_index_to_index_within_plugin(0, ns) == 0 assert total_index_to_index_within_plugin(1, ns) == 1 @@ -215,3 +215,9 @@ def test_total_index_to_index_within_plugin(): assert total_index_to_index_within_plugin(3, ns) == 0 assert total_index_to_index_within_plugin(4, ns) == 1 assert total_index_to_index_within_plugin(5, ns) == 0 + +def test_total_index_to_index_within_plugin_for_one_plugin(): + ns = [5] + assert total_index_to_index_within_plugin(0, ns) == 0 + assert total_index_to_index_within_plugin(1, ns) == 1 + assert total_index_to_index_within_plugin(2, ns) == 2 diff --git a/tests/test_topology.py b/tests/test_topology.py index fff58b70..58570e7d 100644 --- a/tests/test_topology.py +++ b/tests/test_topology.py @@ -168,6 +168,18 @@ def test_match_atomic_item_to_atomic_type(self, top_fix): class TestUrea: + def test_find_edissoc(self, raw_urea_top_fix): + raw = deepcopy(raw_urea_top_fix) + top = Topology(raw) + assert top.ff.default_edissoc != {} + assert len(top.ff.default_edissoc.keys()) == 1 + assert top.ff.default_edissoc["_"] is not None + print(top.ff.default_edissoc['_'].keys()) + assert top.ff.default_edissoc["_"][('C', 'O')] == 743 + assert top.ff.default_edissoc["_"][('H', 'O')] == 463 + print(top.ff.default_edissoc) + + def test_urea(self, raw_urea_top_fix): raw = deepcopy(raw_urea_top_fix) top = Topology(raw)