diff --git a/frustratometer/classes/AWSEM.py b/frustratometer/classes/AWSEM.py index 770f4ad..5131fbd 100644 --- a/frustratometer/classes/AWSEM.py +++ b/frustratometer/classes/AWSEM.py @@ -5,9 +5,9 @@ from .Gamma import Gamma from pydantic import BaseModel, Field, ConfigDict from pydantic.types import Path -from typing import List,Optional,Union +from typing import List,Optional,Union,Generator -__all__ = ['AWSEM'] +__all__ = ['AWSEM','AWSEMIndicators','DecoyEnsemble'] class AWSEMParameters(BaseModel): model_config = ConfigDict(extra='ignore', arbitrary_types_allowed=True) @@ -37,7 +37,6 @@ class AWSEMParameters(BaseModel): r_maxII: float = Field(9.5, description="Maximum distance for mediated contact potential. (Angstrom)") eta_sigma: float = Field(7.0, description="Sharpness of the density-based switching function between protein-mediated and water-mediated contacts.") - #Membrane membrane_gamma: Union[Path,Gamma] = Field(_path/'data'/'AWSEM_membrane_2015.json', description="File or Gamma object containing the membrane Gamma values (for membrane proteins)") eta_switching: int = Field(10, description="Switching distance for the membrane switching function") @@ -46,16 +45,18 @@ class AWSEMParameters(BaseModel): min_sequence_separation_electrostatics: Optional[int] = Field(1, description="Minimum sequence separation for electrostatics calculation.") k_electrostatics: float = Field(17.3636, description="Coefficient for electrostatic interactions. (kJ/mol)") electrostatics_screening_length: float = Field(10, description="Screening length for electrostatic interactions. (Angstrom)") + charges: np.array = Field(np.array([0, 1, 0, -1, 0, 0, -1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), description="Charge on each residue type") + # ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F', 'P', 'S', 'T', 'W', 'Y', 'V'] + +class AWSEMBase(Frustratometer): -class AWSEM(Frustratometer): #Mapping to DCA q = 20 aa_map_awsem_list = [0, 0, 4, 3, 6, 13, 7, 8, 9, 11, 10, 12, 2, 14, 5, 1, 15, 16, 19, 17, 18] #A gap has no energy aa_map_awsem_x, aa_map_awsem_y = np.meshgrid(aa_map_awsem_list, aa_map_awsem_list, indexing='ij') def __init__(self, - pdb_structure: object, - sequence: str =None, + sequence: str, expose_indicator_functions: bool=False, **parameters)->object: """ @@ -63,10 +64,8 @@ def __init__(self, Parameters ---------- - pdb_structure : object - Structure object generated by Structure class - sequence : str - The amino acid sequence of the protein. The sequence is assumed to be in one-letter code. + sequence: str + The amino acid sequence expose_indicator_functions: bool If set to True, indicator functions of the contact and burial energy terms can be accessed by user. @@ -75,7 +74,15 @@ def __init__(self, AWSEM object """ - #Set attributes + # set sequence based on argument + self.N = len(sequence) + self.sequence = sequence + + # set indicator function exposure based on argument + # i guess not exposing indicator functions saves memory? + self.expose_indicator_functions = expose_indicator_functions + + # parse other arguments p = AWSEMParameters(**parameters) if p.min_sequence_separation_contact is None: p.min_sequence_separation_contact = 1 @@ -83,171 +90,238 @@ def __init__(self, p.min_sequence_separation_rho = 1 if p.min_sequence_separation_electrostatics is None: p.min_sequence_separation_electrostatics = 1 - for field, value in p: setattr(self, field, value) - - #Gamma parameters - if isinstance(p.gamma, Gamma): - gamma = p.gamma - elif isinstance(p.gamma, Path): - gamma = Gamma(p.gamma) + self.p = p + + # set gamma + if isinstance(self.p.gamma, Gamma): + gamma = self.p.gamma + elif isinstance(self.p.gamma, Path): + gamma = Gamma(self.p.gamma) else: raise ValueError("Gamma parameter must be a path or a Gamma object.") - self.gamma=gamma self.burial_gamma = gamma['Burial'].T self.direct_gamma = gamma['Direct'][0] self.protein_gamma = gamma['Protein'][0] self.water_gamma = gamma['Water'][0] - self.burial_in_context=p.burial_in_context - #Structure details - self.full_to_aligned_index_dict=pdb_structure.full_to_aligned_index_dict - if sequence is None: - self.sequence=pdb_structure.sequence + # set other attributes + self.burial_in_context = self.p.burial_in_context + self.aa_freq = frustration.compute_aa_freq(self.sequence) + self.contact_freq = frustration.compute_contact_freq(self.sequence) + charges2 = self.p.charges[:,np.newaxis] * self.p.charges[np.newaxis,:] + if self.p.k_electrostatics != 0: + self.sequence_cutoff=min(self.p.min_sequence_separation_electrostatics, self.p.min_sequence_separation_contact) + self.distance_cutoff=None # the distance matrix isn't guaranteed to exist in all subclasses, + # but it doesn't hurt to define the distance_cutoff attribute-- + # it's just like any other parameter, such as sequence_cutoff, + # that only matters if we need to compute a mask from a distance matrix + self.electrostatics_gamma = -self.p.k_electrostatics * charges2[np.newaxis, np.newaxis, :, :] else: - self.sequence=sequence - self.structure=pdb_structure.structure - self.chain=pdb_structure.chain - self.pdb_file=pdb_structure.pdb_file - self.init_index_shift=pdb_structure.init_index_shift - self.distance_matrix=pdb_structure.distance_matrix - self.full_pdb_distance_matrix=pdb_structure.full_pdb_distance_matrix - selection_CB = self.structure.select('name CB or (resname GLY IGL and name CA)') + self.sequence_cutoff=self.p.min_sequence_separation_contact + self.distance_cutoff=self.p.distance_cutoff_contact # the distance matrix isn't guaranteed to exist in all subclasses, + # but it doesn't hurt to define the distance_cutoff attribute-- + # it's just like any other parameter, such as sequence_cutoff, + # that only matters if we need to compute a mask from a distance matrix + self.charges2 = charges2 + # ?????? + self._decoy_fluctuation = {} # don't know what this does + self.minimally_frustrated_threshold=.78 # this should be a class variable or an argument to __init__ - resid = selection_CB.getResindices() - self.resid=resid - self.N=len(self.resid) - assert self.N == len(self.sequence), "The pdb is incomplete. Try setting 'repair_pdb=True' when constructing the Structure object." + def setup_model(self): + self.calculate_indicators() + self.calculate_energy_and_potts() + + def calculate_indicators(self): + raise NotImplementedError("Subclasses must this method") + + def calculate_energy_and_potts(self): + + J_index = np.meshgrid(range(self.N), range(self.N), range(self.q), range(self.q), indexing='ij', sparse=False) + h_index = np.meshgrid(range(self.N), range(self.q), indexing='ij', sparse=False) + + # compute burial and contact energies + self.burial_energy = 0.5 * self.p.k_contact * self.burial_gamma[h_index[1]] * self.burial_indicator[:, np.newaxis, :] + direct = self.direct_indicator * self.direct_gamma[J_index[2], J_index[3]] + water_mediated = self.water_indicator * self.water_gamma[J_index[2], J_index[3]] + protein_mediated = self.protein_indicator * self.protein_gamma[J_index[2], J_index[3]] + contact_energy = self.p.k_contact * np.array([direct, water_mediated, protein_mediated]) * self.sequence_mask_contact[np.newaxis, :, :, np.newaxis, np.newaxis] + + # Compute electrostatics and add to contact energy + if self.p.k_electrostatics!=0: + electrostatics_energy = self.electrostatics_gamma * self.electrostatics_indicator[:,:,np.newaxis,np.newaxis] + contact_energy = np.append(contact_energy, electrostatics_energy[np.newaxis,:,:,:,:], axis=0) + + self.contact_energy = contact_energy + # Compute potts model + self.potts_model = {} + self.potts_model['h'] = self.burial_energy.sum(axis=-1)[:, self.aa_map_awsem_list] + self.potts_model['J'] = self.contact_energy.sum(axis=0)[:, :, self.aa_map_awsem_x, self.aa_map_awsem_y] + # Set the gap energy to zero + self.potts_model['h'][:, 0] = 0 + self.potts_model['J'][:, :, 0, :] = 0 + self.potts_model['J'][:, :, :, 0] = 0 + self._native_energy=None # don't know what this does + + + def compute_configurational_decoy_statistics(self): + raise NotImplementedError("Subclasses must define this method") + + def compute_configurational_energies(self): + raise NotImplementedError("Subclasses must define this method") + + def configurational_frustration(self,aa_freq=None, correction=0, n_decoys=4000): + mean_decoy_energy, std_decoy_energy = self.compute_configurational_decoy_statistics(n_decoys=n_decoys,aa_freq=aa_freq) + return -(self.compute_configurational_energies()-mean_decoy_energy)/(std_decoy_energy+correction) + + +class AWSEM(AWSEMBase): + + def __init__(self, + pdb_structure: object, + sequence: str =None, + expose_indicator_functions: bool=False, + **parameters)->object: + # assume the user wanted the sequence from the pdb structure if not given + if not sequence: + sequence = pdb_structure.sequence + # load structure-independent parameters and methods + super().__init__(sequence, expose_indicator_functions, **parameters) + # set up strucure + self.setup_structure(pdb_structure) + # calculate masks if self.burial_in_context==True: selected_matrix=self.full_pdb_distance_matrix else: selected_matrix=self.distance_matrix - sequence_mask_rho = frustration.compute_mask(selected_matrix, + self.sequence_mask_rho = frustration.compute_mask(selected_matrix, maximum_contact_distance=None, - minimum_sequence_separation = p.min_sequence_separation_rho) - sequence_mask_contact = frustration.compute_mask(self.distance_matrix, - maximum_contact_distance=p.distance_cutoff_contact, - minimum_sequence_separation = p.min_sequence_separation_contact) - - self._decoy_fluctuation = {} - self.minimally_frustrated_threshold=.78 + minimum_sequence_separation = self.p.min_sequence_separation_rho) + self.sequence_mask_contact = frustration.compute_mask(self.distance_matrix, + maximum_contact_distance=self.p.distance_cutoff_contact, + minimum_sequence_separation = self.p.min_sequence_separation_contact) + self.electrostatics_mask = frustration.compute_mask(self.distance_matrix, + maximum_contact_distance=None, + minimum_sequence_separation=self.p.min_sequence_separation_electrostatics) + with open('my_data.txt','w') as f: + f.write(f"self.distance_cutoff: {self.distance_cutoff}\n") + f.write(f"self.sequence_cutoff: {self.sequence_cutoff}\n") + np.save('my_distance_matrix.npy',self.distance_matrix) + self.mask = frustration.compute_mask(self.distance_matrix, + maximum_contact_distance=self.distance_cutoff, + minimum_sequence_separation = self.sequence_cutoff) + np.save('my_mask_new.npy',self.mask) + self.selected_matrix = selected_matrix # we'll need this in the calculate_indicators function + self.setup_model() + + def setup_structure(self, pdb_structure): + # check structure + selection_CB = pdb_structure.structure.select('name CB or (resname GLY IGL and name CA)') + resid = selection_CB.getResindices() + N=len(resid) + self.resid = resid + self.N = N + # set structure-dependent proterties + self._pdb_structure = pdb_structure + self.structure=pdb_structure.structure + self.chain=pdb_structure.chain + self.pdb_file=pdb_structure.pdb_file + self.init_index_shift=pdb_structure.init_index_shift + self.full_to_aligned_index_dict=pdb_structure.full_to_aligned_index_dict + self.distance_matrix=pdb_structure.distance_matrix + self.full_pdb_distance_matrix=pdb_structure.full_pdb_distance_matrix + @property + def pdb_structure(self): + return self._pdb_structure + @pdb_structure.setter + def pdb_structure(self,pdb_structure): + # reset structural attributes + self.setup_structure(pdb_structure) + # check that our new structure is compatible with our old one + if self.N != len(self.sequence): + import pdb; pdb.set_trace() + raise ValueError("The pdb is incomplete. Try setting 'repair_pdb=True' when constructing the Structure object.") + self.calculate_indicators() + def change_conformation(alternative_pdb_structure): + # this function is an alias for the pdb_structure setter + self.pdb_structure = alternative_pdb_structure + + def calculate_indicators(self): # Calculate rho rho = 0.25 - rho *= (1 + np.tanh(p.eta * (selected_matrix- p.r_min))) - rho *= (1 + np.tanh(p.eta * (p.r_max - selected_matrix))) - rho *= sequence_mask_rho + rho *= (1 + np.tanh(self.p.eta * (self.selected_matrix - self.p.r_min))) + rho *= (1 + np.tanh(self.p.eta * (self.p.r_max - self.selected_matrix))) + rho *= self.sequence_mask_rho self.rho=rho - #Calculate sigma water rho_r = (rho).sum(axis=1) if self.full_pdb_distance_matrix.shape!=self.distance_matrix.shape: if self.burial_in_context==True: - self.init_index_shift=pdb_structure.init_index_shift - self.fin_index_shift=pdb_structure.fin_index_shift + self.init_index_shift=self.pdb_structure.init_index_shift + self.fin_index_shift=self.pdb_structure.fin_index_shift rho_r=rho_r[self.init_index_shift:self.fin_index_shift] self.rho_r=rho_r rho_b = np.expand_dims(rho_r, 1) rho1 = np.expand_dims(rho_r, 0) rho2 = np.expand_dims(rho_r, 1) - sigma_water = 0.25 * (1 - np.tanh(p.eta_sigma * (rho1 - p.rho_0))) * (1 - np.tanh(p.eta_sigma * (rho2 - p.rho_0))) + sigma_water = 0.25 * (1 - np.tanh(self.p.eta_sigma * (rho1 - self.p.rho_0))) * (1 - np.tanh(self.p.eta_sigma * (rho2 - self.p.rho_0))) sigma_protein = 1 - sigma_water - #Calculate theta and indicators - theta = 0.25 * (1 + np.tanh(p.eta * (self.distance_matrix - p.r_min))) * (1 + np.tanh(p.eta * (p.r_max - self.distance_matrix))) - thetaII = 0.25 * (1 + np.tanh(p.eta * (self.distance_matrix - p.r_minII))) * (1 + np.tanh(p.eta * (p.r_maxII - self.distance_matrix))) - burial_indicator = np.tanh(p.burial_kappa * (rho_b - p.burial_ro_min)) + np.tanh(p.burial_kappa * (p.burial_ro_max - rho_b)) + theta = 0.25 * (1 + np.tanh(self.p.eta * (self.distance_matrix - self.p.r_min))) * (1 + np.tanh(self.p.eta * (self.p.r_max - self.distance_matrix))) + thetaII = 0.25 * (1 + np.tanh(self.p.eta * (self.distance_matrix - self.p.r_minII))) * (1 + np.tanh(self.p.eta * (self.p.r_maxII - self.distance_matrix))) + burial_indicator = np.tanh(self.p.burial_kappa * (rho_b - self.p.burial_ro_min)) + np.tanh(self.p.burial_kappa * (self.p.burial_ro_max - rho_b)) direct_indicator = theta[:, :, np.newaxis, np.newaxis] water_indicator = thetaII[:, :, np.newaxis, np.newaxis] * sigma_water[:, :, np.newaxis, np.newaxis] protein_indicator = thetaII[:, :, np.newaxis, np.newaxis] * sigma_protein[:, :, np.newaxis, np.newaxis] - - if expose_indicator_functions: - self.indicators=[] - self.indicators.append(burial_indicator[:,0]) - self.indicators.append(burial_indicator[:,1]) - self.indicators.append(burial_indicator[:,2]) - - self.indicators.append(direct_indicator[:,:,0,0]*sequence_mask_contact) - self.indicators.append(protein_indicator[:,:,0,0]*sequence_mask_contact) - self.indicators.append(water_indicator[:,:,0,0]*sequence_mask_contact) - - self.gamma_array=[] - temp_burial_gamma=self.burial_gamma[self.aa_map_awsem_list] - temp_burial_gamma[0]=0 - temp_burial_gamma *= -0.5 * p.k_contact - self.gamma_array.append(temp_burial_gamma[:,0]) - self.gamma_array.append(temp_burial_gamma[:,1]) - self.gamma_array.append(temp_burial_gamma[:,2]) - - for contact_gamma in [self.direct_gamma, self.protein_gamma, self.water_gamma]: - temp_gamma = contact_gamma[self.aa_map_awsem_x, self.aa_map_awsem_y].copy() - temp_gamma[0, :] = 0 - temp_gamma[:, 0] = 0 - temp_gamma *= -0.5 * self.k_contact - self.gamma_array.append(temp_gamma) - - self.burial_indicator = burial_indicator - self.direct_indicator = direct_indicator - self.water_indicator = water_indicator - self.protein_indicator = protein_indicator - - - J_index = np.meshgrid(range(self.N), range(self.N), range(self.q), range(self.q), indexing='ij', sparse=False) - h_index = np.meshgrid(range(self.N), range(self.q), indexing='ij', sparse=False) - - #Burial energy - burial_energy = 0.5 * p.k_contact * self.burial_gamma[h_index[1]] * burial_indicator[:, np.newaxis, :] - self.burial_energy = burial_energy - - #Contact energy - direct = direct_indicator * self.direct_gamma[J_index[2], J_index[3]] - water_mediated = water_indicator * self.water_gamma[J_index[2], J_index[3]] - protein_mediated = protein_indicator * self.protein_gamma[J_index[2], J_index[3]] - contact_energy = p.k_contact * np.array([direct, water_mediated, protein_mediated]) * sequence_mask_contact[np.newaxis, :, :, np.newaxis, np.newaxis] - - # Compute electrostatics - if p.k_electrostatics!=0: - self.sequence_cutoff=min(p.min_sequence_separation_electrostatics, p.min_sequence_separation_contact) - self.distance_cutoff=None - - - electrostatics_mask = frustration.compute_mask(self.distance_matrix, maximum_contact_distance=None, minimum_sequence_separation=p.min_sequence_separation_electrostatics) - # ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F', 'P', 'S', 'T', 'W', 'Y', 'V'] - charges = np.array([0, 1, 0, -1, 0, 0, -1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]) - charges2 = charges[:,np.newaxis]*charges[np.newaxis,:] - - electrostatics_indicator = 1 / (self.distance_matrix + 1E-6) * np.exp(-self.distance_matrix / p.electrostatics_screening_length) * electrostatics_mask - electrostatics_energy = -p.k_electrostatics * (charges2[np.newaxis,np.newaxis,:,:]*electrostatics_indicator[:,:,np.newaxis,np.newaxis]) - - contact_energy = np.append(contact_energy, electrostatics_energy[np.newaxis,:,:,:,:], axis=0) - if expose_indicator_functions: - self.indicators.append(electrostatics_indicator) - temp_gamma=0.5 * p.k_electrostatics * charges2[self.aa_map_awsem_x, self.aa_map_awsem_y] - temp_gamma[0,:]=0 - temp_gamma[:,0]=0 - self.gamma_array.append(temp_gamma) - else: - self.sequence_cutoff=p.min_sequence_separation_contact - self.distance_cutoff=p.distance_cutoff_contact - self.mask = frustration.compute_mask(self.distance_matrix, maximum_contact_distance=self.distance_cutoff, minimum_sequence_separation = self.sequence_cutoff) - - self.contact_energy = contact_energy - - # Compute fast properties - self.aa_freq = frustration.compute_aa_freq(self.sequence) - self.contact_freq = frustration.compute_contact_freq(self.sequence) - self.potts_model = {} - self.potts_model['h'] = burial_energy.sum(axis=-1)[:, self.aa_map_awsem_list] - self.potts_model['J'] = contact_energy.sum(axis=0)[:, :, self.aa_map_awsem_x, self.aa_map_awsem_y] - - # Set the gap energy to zero - self.potts_model['h'][:, 0] = 0 - self.potts_model['J'][:, :, 0, :] = 0 - self.potts_model['J'][:, :, :, 0] = 0 - self._native_energy=None + # store indicators and gammas for our particular sequence as attributes + self.indicators=[] + self.indicators.append(burial_indicator[:,0]) + self.indicators.append(burial_indicator[:,1]) + self.indicators.append(burial_indicator[:,2]) + self.indicators.append(direct_indicator[:,:,0,0]*self.sequence_mask_contact) + self.indicators.append(protein_indicator[:,:,0,0]*self.sequence_mask_contact) + self.indicators.append(water_indicator[:,:,0,0]*self.sequence_mask_contact) + self.gamma_array=[] + temp_burial_gamma=self.burial_gamma[self.aa_map_awsem_list] + temp_burial_gamma[0]=0 + temp_burial_gamma *= -0.5 * self.p.k_contact + self.gamma_array.append(temp_burial_gamma[:,0]) + self.gamma_array.append(temp_burial_gamma[:,1]) + self.gamma_array.append(temp_burial_gamma[:,2]) + for contact_gamma in [self.direct_gamma, self.protein_gamma, self.water_gamma]: + temp_gamma = contact_gamma[self.aa_map_awsem_x, self.aa_map_awsem_y].copy() + temp_gamma[0, :] = 0 + temp_gamma[:, 0] = 0 + temp_gamma *= -0.5 * self.k_contact + self.gamma_array.append(temp_gamma) + self.burial_indicator = burial_indicator # probably could get rid of either this or indicators list + self.direct_indicator = direct_indicator # probably could get rid of either this or indicators list + self.water_indicator = water_indicator # probably could get rid of either this or indicators list + self.protein_indicator = protein_indicator # probably could get rid of either this or indicators list + if self.p.k_electrostatics != 0: + electrostatics_indicator = 1 / (self.distance_matrix + 1E-6) * np.exp(-self.distance_matrix / self.p.electrostatics_screening_length) * self.electrostatics_mask + self.indicators.append(electrostatics_indicator) + self.electrostatics_indicator = electrostatics_indicator # probably could get rid of either this or indicators list + temp_gamma = 0.5 * self.p.k_electrostatics * self.charges2[self.aa_map_awsem_x, self.aa_map_awsem_y] + temp_gamma[0,:]=0 + temp_gamma[:,0]=0 + self.gamma_array.append(temp_gamma) + + def calculate_energy_and_potts(self): + super().calculate_energy_and_potts() + if not self.expose_indicator_functions: + del self.burial_indicator + del self.direct_indicator + del self.water_indicator + del self.protein_indicator + if "electrostatics_indicator" in dir(self): + # won't exist if electrostatics are turned off + del self.electrostatics_indicator + del self.indicators def compute_configurational_decoy_statistics(self, n_decoys=4000,aa_freq=None): # ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F', 'P', 'S', 'T', 'W', 'Y', 'V'] @@ -364,6 +438,247 @@ def compute_configurational_energies(self): # import pandas as pd return configurational_energies #, pd.DataFrame(decoy_data, columns=decoy_data_columns) - def configurational_frustration(self,aa_freq=None, correction=0, n_decoys=4000): - mean_decoy_energy, std_decoy_energy = self.compute_configurational_decoy_statistics(n_decoys=n_decoys,aa_freq=aa_freq) - return -(self.compute_configurational_energies()-mean_decoy_energy)/(std_decoy_energy+correction) \ No newline at end of file + +class AWSEMIndicators(AWSEMBase): + + def __init__(self, + burial_indicator: np.ndarray, + direct_indicator: np.ndarray, + protein_indicator: np.ndarray, + water_indicator: np.ndarray, + electrostatics_indicator: Union[np.ndarray, None], + sequence: str, # sequence is optional if we initialize from a Structure but not here + expose_indicator_functions: bool=False, + absolute_value_gamma: bool=False, + **parameters)->object: + """ + A stripped-down version of the AWSEM class that can be initialized from a set of indicator functions + + Parameters + ---------- + burial_indicator : np.ndarray + Burial indicator array, most likely accessed using the burial_indicator attribute of an AWSEM + direct_indicator : np.ndarray + Direct indicator array, most likely accessed using the direct_indicator attribute of an AWSEM + protein_indicator : np.ndarray + Protein indicator array, most likely accessed using the protein_indicator attribute of an AWSEM + water_indicator : np.ndarray + Water indicator array, most likely accessed using the water_indicator attribute of an AWSEM + electrostatics_indicator : Union[np.ndarray, None] + Electrostatics indicator array, most likely accessed using the electrostatics_indicator attribute of an AWSEM. + May be None is electrostatics were turned off (k_electrostatics=0). + sequence : str + The amino acid sequence of the protein. The sequence is assumed to be in one-letter code. + expose_indicator_functions: bool + If set to True, indicator functions of the contact and burial energy terms can be accessed by user. + absolute_value_gamma: bool + If True, replace gammas with their absolute values. This is helpful for the standard deviation approximation + + Returns + ------- + AWSEMIndicators object + + """ + super().__init__(sequence, expose_indicator_functions, **parameters) + self.burial_indicator = burial_indicator + self.direct_indicator = direct_indicator + self.protein_indicator = protein_indicator + self.water_indicator = water_indicator + self.electrostatics_indicator = electrostatics_indicator + self.sequence_mask_contact = np.full((self.N,self.N), True) + self.mask = np.full((self.N,self.N), True) + if absolute_value_gamma: + self.burial_gamma = -np.abs(self.burial_gamma) + self.direct_gamma = -np.abs(self.direct_gamma) + self.protein_gamma = -np.abs(self.protein_gamma) + self.water_gamma = -np.abs(self.water_gamma) + self.electrostatics_gamma = -np.abs(self.electrostatics_gamma) + self.absolute_value_gamma = absolute_value_gamma + # mask should have been applied when calculating the indicator functions, + # so we set it such that no further masking is performed + self.setup_model() + + def calculate_indicators(self): + pass # the function was initialized with indicators, so there's nothing to do + + +class DecoyEnsemble(): + + def __init__(self, + pdb_structures: Generator[object,None,None], + **parameters)->object: + """ + Generate DecoyEnsemble object + + Parameters + ---------- + pdb_structures : Generator[object,None,None] + yields Structure objects representing decoy structures + other parameters: + masks and cutoffs affecting the AWSEM class's indicator function calculations; + they must be the same for all structures; burial_in_context also available, but use at your own risk + + Returns + ------- + DecoyEnsemble object, which holds indicator arrays (and gammas???) computed by the AWSEM class. + """ + # the AWSEM class takes care of the indicator calculation (including masking) for us + # AWSEM normally accepts an amino acid sequence argument, but we don't need that here + # However, we do need to pass through parameters used to generate the indicator functions + awsem_obj = AWSEM(next(pdb_structures), expose_indicator_functions=True, repair_pdb=True, **parameters) + for pdb_structure in pdb_structures: + awsem_obj.pdb_structure = pdb_structure # we can use the pdb_structure setter to update structural + # stuff without fully re-initializing the object + with open('burial_indicators.npy','ab') as f: + np.save(f,awsem_obj.burial_indicator) + with open('direct_indicators.npy','ab') as f: + np.save(f,awsem_obj.direct_indicator) + with open('protein_indicators.npy','ab') as f: + np.save(f,awsem_obj.protein_indicator) + with open('water_indicators.npy','ab') as f: + np.save(f,awsem_obj.water_indicator) + with open('electrostatics_indicators.npy','ab') as f: + if hasattr(awsem_obj, 'electrostatics_indicator'): + np.save(f,awsem_obj.electrostatics_indicator) + else: + np.save(f,None) + # averages are needed to compute standard deviation + # having these be attributes allows them to be easily passed + # from the avg method to the std method + self.avg_burial = None + self.avg_direct = None + self.avg_prot = None + self.avg_wat = None + self.avg_elect = None + ################################################ + ## Attach gamma parameters from the AWSEM object + ## Kind of off-topic from my current use of this class + #self.burial_gamma = awsem_obj.burial_gamma + #self.direct_gamma = awsem_obj.direct_gamma + #self.protein_gamma = awsem_obj.protein_gamma + #self.water_gamma = awsem_obj.water_gamma + #self.electrostatics_gamma = getattr(awsem_obj, 'electrostatics_gamma', None) + ################################################ + # this might help with memory + del awsem_obj + + # To manage memory, we need the indicator attributes to be generators (see self.get_indicators). + # But to ensure that we can iterate over them more than once, we need to + # be able to reinitialize the generators. We accomplish this with properties + @property + def burial_indicators(self): + return self.get_indicators("burial_indicators.npy") + @property + def direct_indicators(self): + return self.get_indicators("direct_indicators.npy") + @property + def protein_indicators(self): + return self.get_indicators("protein_indicators.npy") + @property + def water_indicators(self): + return self.get_indicators("water_indicators.npy") + @property + def electrostatics_indicators(self): + return self.get_indicators("electrostatics_indicators.npy") + + # allows us to process indicators without holding them all in memory + # this requires that every method that acts on the indicators iterates over them + def get_indicators(self, filename): + # expecting a numpy file + with open(filename, 'rb') as f: + while True: + try: + yield np.load(f, allow_pickle=True) # needed to load None if not electrostatics + except EOFError: + break + + # average indicator functions over all decoys + def avg(self): + # average burial computation from generator + avg_burial = 0 + counter = 0 + for array in self.burial_indicators: + counter += 1 + avg_burial += array + avg_burial /= counter + self.avg_burial = avg_burial + # average direct computation from generator + avg_direct = 0 + counter = 0 + for array in self.direct_indicators: + counter += 1 + avg_direct += array + avg_direct /= counter + self.avg_direct = avg_direct + # average prot computation from generator + avg_prot = 0 + counter = 0 + for array in self.protein_indicators: + counter += 1 + avg_prot += array + avg_prot /= counter + self.avg_prot = avg_prot + # average wat computation from generator + avg_wat = 0 + counter = 0 + for array in self.water_indicators: + counter += 1 + avg_wat += array + avg_wat /= counter + self.avg_wat = avg_wat + # average elec computation from generator + # if not defined, set to zero, which will have no impact + if self.electrostatics_indicators == None: + avg_elec = 0 + else: + avg_elec = 0 + counter = 0 + for array in self.electrostatics_indicators: + counter += 1 + avg_elec += array + avg_elec /= counter + self.avg_elec = avg_elec + return self.avg_burial, self.avg_direct, self.avg_prot, self.avg_wat, self.avg_elec + + # standard deviation of indicator functions over all decoys + def std(self): + if self.avg_burial is None or self.avg_direct is None or \ + self.avg_prot is None or self.avg_wat is None or \ + self.avg_elec is None: + self.avg() # compute averages if not already done + # std burial computation from generator and previously computed average + std_burial = 0 + counter = 0 + for array in self.burial_indicators: + counter += 1 + std_burial += (array - self.avg_burial) ** 2 + std_burial = np.sqrt(std_burial / counter) + # std direct computation from generator and previously computed average + std_direct = 0 + counter = 0 + for array in self.direct_indicators: + counter += 1 + std_direct += (array - self.avg_direct) ** 2 + std_direct = np.sqrt(std_direct / counter) + # std prot computation from generator and previously computed average + std_prot = 0 + counter = 0 + for array in self.protein_indicators: + counter += 1 + std_prot += (array - self.avg_prot) ** 2 + std_prot = np.sqrt(std_prot / counter) + # std wat computation from generator and previously computed average + std_wat = 0 + counter = 0 + for array in self.water_indicators: + counter += 1 + std_wat += (array - self.avg_wat) ** 2 + std_wat = np.sqrt(std_wat / counter) + # std elec computation from generator and previously computed average + std_elec = 0 + counter = 0 + for array in self.electrostatics_indicators: + counter += 1 + std_elec += (array - self.avg_elec) ** 2 + std_elec = np.sqrt(std_elec / counter) + return std_burial, std_direct, std_prot, std_wat, std_elec diff --git a/frustratometer/classes/__init__.py b/frustratometer/classes/__init__.py index 6621727..fd8b79b 100644 --- a/frustratometer/classes/__init__.py +++ b/frustratometer/classes/__init__.py @@ -7,7 +7,7 @@ """ from .DCA import DCA -from .AWSEM import AWSEM +from .AWSEM import AWSEM, AWSEMIndicators, DecoyEnsemble from .Structure import Structure from .Map import Map from .Gamma import Gamma diff --git a/frustratometer/optimization/optimization.py b/frustratometer/optimization/optimization.py index 0bdc5e5..416f195 100644 --- a/frustratometer/optimization/optimization.py +++ b/frustratometer/optimization/optimization.py @@ -7,7 +7,7 @@ from frustratometer.classes import Frustratometer from frustratometer.classes import Structure -from frustratometer.classes import AWSEM +from frustratometer.classes import AWSEM, AWSEMIndicators, DecoyEnsemble from frustratometer.optimization.EnergyTerm import EnergyTerm from frustratometer.optimization.inner_product import compute_all_region_means from frustratometer.optimization.inner_product import build_mean_inner_product_matrix @@ -512,6 +512,7 @@ def regression_test(self, seq_index): energy=self.compute_energy(seq_index) assert np.isclose(energy,expected_energy), f"Expected energy {expected_energy} but got {energy}" + class AwsemEnergyVariance(EnergyTerm): def __init__(self, model:Frustratometer, use_numba=True, alphabet=_AA): self._use_numba=use_numba @@ -1109,111 +1110,22 @@ def find_optimal_replicas(self, max_replicas=32, n_repeats=5, n_steps=10000): if __name__ == '__main__': - native_pdb = "tests/data/1r69.pdb" - - structure_bound = Structure.full_pdb(native_pdb, chain=None) - structure_free = Structure.full_pdb(native_pdb, "A") - - model_bound = AWSEM(structure_bound, distance_cutoff_contact=10, min_sequence_separation_contact=2, expose_indicator_functions=True) - model_free = AWSEM(structure_free, distance_cutoff_contact=10, min_sequence_separation_contact=2, expose_indicator_functions=True) - reduced_alphabet = 'ADEFHIKLMNQRSTVWY' - - print(model_bound.sequence) - print(model_free.sequence) - - # binding_region=np.array([1, 2, 3, 4, 26, 27, 28, 29, 30, 31, 32, 33, 49, 50, 51, 52, 53, 54, 55, 56, 57, 68, 69, 70, 90, 91, 92, 93, 94, 95, 96, 97, 109, 110, 111, 112, 113, 114, 115, 116, 117, 127, 128, 129, 130, 131, 132, 133, 134, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 190, 191, 192, 193, 194, 195, 196, 197])-1 - energy_bound = AwsemEnergySelected(model_bound, alphabet=reduced_alphabet, selection=np.array(range(len(model_free.sequence)))) - energy_free = AwsemEnergySelected(model_free, alphabet=reduced_alphabet) - energy_average = AwsemEnergyAverage(model_free, alphabet=reduced_alphabet) - energy_std = AwsemEnergyStd(model_free, alphabet=reduced_alphabet) - energy_std_100 = AwsemEnergyStd(model_free, alphabet=reduced_alphabet, n_decoys=100) - energy_std_1000 = AwsemEnergyStd(model_free, alphabet=reduced_alphabet, n_decoys=1000) - energy_std_10000 = AwsemEnergyStd(model_free, alphabet=reduced_alphabet, n_decoys=10000) - energy_variance = AwsemEnergyVariance(model_free, alphabet=reduced_alphabet) - heterogeneity = Heterogeneity(exact=False, use_numba=True) - similarity = Similarity(model_free.sequence, use_numba=True) - - # energy_mix = energy_free - 20 * heterogeneity - energy_mix = (energy_free - energy_average) / energy_std - # energy_mix = energy_bound - energy_free - - energy_mixes = {"EnergyFree": energy_free, - "EnergyBound": energy_bound, - "Heterogeneity": heterogeneity, - "EnergyAverage": energy_average, - "EnergyStd_ndecoys100": energy_std_100, - "EnergyStd_ndecoys1000": energy_std_1000, - "EnergyStd_ndecoys10000": energy_std_10000, - "EnergyStd": energy_std, - "Zscore_ndecoys10000":(energy_free - energy_average) / energy_std_10000, - "Zscore":(energy_free - energy_average) / energy_std, - "EnergyVariance": energy_variance, - "Binding": (energy_bound - energy_free), - "Similarity": similarity, - "Ivan":energy_bound - 40 * heterogeneity, - "Takada": (energy_bound - energy_average) / energy_std, - "Ivan_binding":(energy_bound - energy_free) - 40 * heterogeneity, - "Takada_binding":(energy_free - energy_average) / energy_std + (energy_bound - energy_free), - "Ivan_Takada_binding": (energy_free - energy_average) / energy_std + (energy_bound - energy_free) - 40 * heterogeneity, - "Corrected_Takada": (energy_bound - energy_average) / (energy_std+5), - "Corrected_Takada_binding":(energy_free - energy_average) / (energy_std+5) + (energy_bound - energy_free), - "Ivan_Corrected_Takada_binding": (energy_free - energy_average) / (energy_std+5) + (energy_bound - energy_free) - 40 * heterogeneity, - "Ivan_bindidng similarity": (energy_bound - energy_free) - 40 * heterogeneity - 100*similarity, - "Corrected_Takada_binding_similarity":(energy_free - energy_average) / (energy_std+5) + (energy_bound - energy_free) - 100*similarity, - "Ivan_bindidng_similarityv2": (energy_bound - energy_free) - 40 * heterogeneity} + pdb_list = ["tests/data/1r69.pdb","tests/data/1r69.pdb","tests/data/1r69.pdb"] + pdb_structures = (Structure(pdb, chain=None) for pdb in pdb_list) + ensemble = DecoyEnsemble(pdb_structures, distance_cutoff_contact=10, min_sequence_separation_contact=10) + burial_indicators, direct_indicators, protein_indicators, water_indicators, electrostatics_indicators = ensemble.average() + average = AWSEMIndicators(burial_indicators, direct_indicators, protein_indicators, water_indicators, electrostatics_indicators, + "SISSRVKSKRIQLGLNQAELAQKVGTTQQSIEQLENGKTKRPRFLPELASALGVSVDWLLNGT") - for energy_name,energy_term in energy_mixes.items(): - print (f"Energy term: {energy_name}") - energy_term.benchmark(seq_indices=np.random.randint(0, len(reduced_alphabet), size=(100,len(structure_free.sequence)))) - if "ndecoys" not in energy_name: - energy_term.test(seq_index=np.random.randint(0, len(reduced_alphabet), size=len(structure_free.sequence))) - - monte_carlo = MonteCarlo(sequence = structure_free.sequence, energy=energy_term, alphabet=reduced_alphabet) - monte_carlo.benchmark_montecarlo_steps(n_repeats=3,n_steps=10000) - monte_carlo.benchmark_parallel_montecarlo_steps(n_repeats=3, n_steps=10000, n_replicas=8) - - - - # Profiling of the parallel tempering - import cProfile - import pstats - import io + reduced_alphabet = 'ADEFGHIKLMNQRSTVWY' - monte_carlo = MonteCarlo(sequence=model_free.sequence, energy=energy_mix, alphabet=reduced_alphabet) - # evaluation_energies={"EnergyFree": energy_free, "Heterogeneity": heterogeneity, - # "EnergyAverage": energy_average, "EnergyStd": energy_std, - # "Similarity": similarity, "Zscore":(energy_free - energy_average) / energy_std}) - - monte_carlo.benchmark_montecarlo_steps(n_repeats=3, n_steps=100) - for n_replicas in [1, 2, 4, 8, 16]: - print(f"Running parallel tempering with {n_replicas} replicas") - monte_carlo.benchmark_parallel_montecarlo_steps(n_repeats=3, n_steps=100, n_replicas=n_replicas) - - for n_replicas in [1, 2, 4, 8, 16]: - print(f"Running parallel tempering with {n_replicas} replicas") - monte_carlo.benchmark_parallel_montecarlo_steps(n_repeats=3, n_steps=1000, n_replicas=n_replicas) - - monte_carlo.find_optimal_replicas(max_replicas=32, n_repeats=5, n_steps=1000) - monte_carlo.find_optimal_replicas(max_replicas=8, n_repeats=5, n_steps=10000) - monte_carlo.find_optimal_replicas(max_replicas=8, n_repeats=5, n_steps=100000) - - # # Run the profiler - # profiler = cProfile.Profile() - # profiler.enable() + awsem_energy = AwsemEnergy(average, alphabet=reduced_alphabet) + heterogeneity = Heterogeneity(exact=False, use_numba=True) - # monte_carlo.parallel_tempering(temperatures=np.logspace(3,-4,8), n_steps=1E4, n_steps_per_cycle=1E2) - # profiler.disable() - - # # Print the stats - # s = io.StringIO() - # ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative') - # ps.print_stats() - # ps.dump_stats('parallel_temperingv2.prof') - - - - + energy_mix = awsem_energy - 10*heterogeneity + monte_carlo = MonteCarlo(sequence = "SISSRVKSKRIQLGLNQAELAQKVGTTQQSIEQLENGKTKRPRFLPELASALGVSVDWLLNGT", energy=energy_mix, alphabet=reduced_alphabet) + monte_carlo.annealing(n_steps=1000) diff --git a/frustratometer/pdb/pdb.py b/frustratometer/pdb/pdb.py index 853e6e6..2feeb65 100644 --- a/frustratometer/pdb/pdb.py +++ b/frustratometer/pdb/pdb.py @@ -37,7 +37,8 @@ def download(pdbID: str,directory: Union[Path,str]=Path.cwd()) -> Path: return pdb_file def get_sequence(pdb_file: str, - chain: str + chain: str, + return_start_mask: bool=False ) -> str: """ Get a protein sequence from a pdb file @@ -46,8 +47,10 @@ def get_sequence(pdb_file: str, ---------- pdb_file : str, PDB file location. - chain: str, - Chain ID of the selected protein. + chain: str or list, + Chain ID(s) of the selected protein. + return_start_mask: bool, + Return binary mask list indicating whether each position is the start of a chain Returns ------- @@ -58,37 +61,63 @@ def get_sequence(pdb_file: str, Get a protein sequence from a PDB file :param pdb: PDB file location - :param chain: chain name of PDB file to get sequence + :param chain: chain name(s) of PDB file to get sequence :return: protein sequence """ - if ".cif" in str(pdb_file): - parser = MMCIFParser() - else: - parser = PDBParser() - structure = parser.get_structure('name', pdb_file) + if ".cif" in str(pdb_file): # BIOPYTHON + parser = MMCIFParser() # BIOPYTHON + else: # BIOPYTHON + parser = PDBParser() # BIOPYTHON + structure = parser.get_structure('name', pdb_file) #BIOPYTHON + #structure = prody.parsePDB(str(pdb_file)) # PRODY + #hv = structure.getHierView() # PRODY if chain==None: - all_chains=[i.get_id() for i in structure.get_chains()] + all_chains=[i.get_id() for i in structure.get_chains()] # BIOPYTHON + #all_chains = [structure_chain.getChid() for structure_chain in hv] # PRODY else: - all_chains=[chain] + if type(chain) == list: + all_chains = chain + elif type(chain) == str: + all_chains = [id for id in chain if id != " "] # remove spaces if present in string + else: + raise TypeError(f"chain must be list or str but was {type(chain)}") sequence = "" - for chain in all_chains: - c = structure[0][chain] + start_mask = [] + for single_chain in all_chains: + c = structure[0][single_chain] # BIOPYTHON + #c = hv[single_chain] # PRODY chain_seq = "" for residue in c: - is_regular_res = residue.has_id('CA') and residue.has_id('O') - res_id = residue.get_id()[0] - if (res_id==' ' or res_id=='H_MSE' or res_id=='H_M3L' or res_id=='H_CAS') and is_regular_res: - residue_name = residue.get_resname() + is_regular_res = residue.has_id('CA') and residue.has_id('O') # BIOPYTHON + #atom_names = [atom.getName() for atom in residue] # PRODY + #is_regular_res = ("CA" in atom_names and "O" in atom_names) # PRODY + res_id = residue.get_id()[0] #BIOPYTHON + if (res_id==' ' or res_id=='H_MSE' or res_id=='H_M3L' or res_id=='H_CAS') and is_regular_res: # BIOPYTHON + # i don't know what H_HSE, H_M3L, and H_CAS are doing + # because they aren't in three_to_one, so those should throw an error + # long story short, I don't think we have to worry about them when switching from biopython to prody + #if is_regular_res: # PRODY + residue_name = residue.get_resname() # BIOPYTHON + #residue_name = residue.getResname() # PRODY chain_seq += three_to_one[residue_name] + if chain_seq == "": # empty chain, like a nucleic acid chain (see 8ZWK) + continue # FYI, currently, a non-empty chain with certain invalid residues will throw an error at the three_to_one[residue_name] above sequence += chain_seq - return sequence + start_mask.append(1) + for _ in range(1,len(chain_seq)): + start_mask.append(0) + if return_start_mask: + return (sequence,start_mask) + else: + return sequence def get_distance_matrix(pdb_file: Union[Path,str], chain: str, - method: str = 'CB' + method: str = 'CB', + return_distance_midpoints: bool = False, ) -> np.array: """ Calculate the distance matrix of the specified atoms in a PDB file. @@ -106,6 +135,10 @@ def get_distance_matrix(pdb_file: Union[Path,str], 'CA' for using only the CA atom, 'minimum' for using the minimum distance between all atoms in each residue, 'CB_force' computes a new coordinate for the CB atom based on the CA, C, and N atoms and uses CB distance even for glycine. + return_distance_midpoints: bool + Whether to return a matrix of the same shape as distance_matrix representing the same contacts as distance_matrix + that indicates the absolute coordinates of the midpoint between the pair of atoms. This helps us compute the pair distribution + functions of the different classes of contacts. So this matrix isn't really a matrix because each "element" has 3 channels: x, y, and z Returns: np.array: The distance matrix of the selected atoms. @@ -121,7 +154,7 @@ def get_distance_matrix(pdb_file: Union[Path,str], if method == 'CA': coords = structure.select('protein and name CA' + chain_selection).getCoords() elif method == 'CB': - coords = structure.select('(protein and (name CB) or (resname GLY and name CA))' + chain_selection).getCoords() + coords = structure.select('(protein and (name CB) or (resname GLY IGL and name CA))' + chain_selection).getCoords() elif method == 'minimum': selection = structure.select('protein' + chain_selection) coords = selection.getCoords() @@ -163,8 +196,20 @@ def get_distance_matrix(pdb_file: Union[Path,str], if len(coords) == 0: raise IndexError('Empty selection for distance map') + # coords should be a numpy array of shape (N,3) distance_matrix = sdist.squareform(sdist.pdist(coords)) - return distance_matrix + assert distance_matrix.shape[0] == distance_matrix.shape[1] + if return_distance_midpoints: + midpoint_matrix = np.zeros((distance_matrix.shape[0],distance_matrix.shape[1],3)) + for i in range(distance_matrix.shape[0]): + for j in range(distance_matrix.shape[1]): + midpoint_matrix[i,j,:] = (coords[None,i,:] + coords[None,j,:])/2 + # check that indexing is consistent with distance_matrix + assert np.allclose(np.linalg.norm(coords[i,:]-coords[j,:]),distance_matrix[i,j]) + assert np.allclose(midpoint_matrix,midpoint_matrix.transpose(1,0,2)) # check symmetry + return distance_matrix, midpoint_matrix + else: + return distance_matrix def full_to_filtered_aligned_mapping(aligned_sequence: str, diff --git a/tests/test_optimization.py b/tests/test_optimization.py index 7b939a1..2f1414b 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -347,7 +347,7 @@ def test_diff_mean_inner_product_1_by_1(n_elements = 10): def model(request): native_pdb = "tests/data/1bfz.pdb" distance_cutoff_contact, min_sequence_separation_contact, k_electrostatics = request.param - structure = Structure.full_pdb(native_pdb, "A") + structure = Structure(native_pdb, "A") model = AWSEM(structure, distance_cutoff_contact=distance_cutoff_contact, min_sequence_separation_contact=min_sequence_separation_contact, expose_indicator_functions=True, k_electrostatics=k_electrostatics) return model