Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 81 additions & 113 deletions modelseedpy/core/msbuilder.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging
import itertools
import itertools # !!! this import is not used

import cobra
from modelseedpy.core.rast_client import RastClient
from modelseedpy.core.rast_client import RastClient # !!! this import is not used
from modelseedpy.core.msgenome import normalize_role
from modelseedpy.core.msmodel import get_gpr_string, get_reaction_constraints_from_direction
from cobra.core import Gene, Metabolite, Model, Reaction
from modelseedpy.core import FBAHelper
from modelseedpy.core.msmodel import get_gpr_string, get_reaction_constraints_from_direction # !!! none of these imports are used
from cobra.core import Gene, Metabolite, Model, Reaction # !!! Gene is not used
from modelseedpy.core.fbahelper import FBAHelper
from modelseedpy.fbapkg.mspackagemanager import MSPackageManager

SBO_ANNOTATION = "sbo"
Expand Down Expand Up @@ -249,20 +250,18 @@ def build_biomass(rxn_id, cobra_model, template, biomass_compounds, index='0'):
bio_rxn.annotation[SBO_ANNOTATION] = "SBO:0000629"
return bio_rxn


# A search function
def _aaaa(genome, ontology_term):
search_name_to_genes = {}
search_name_to_orginal = {}
for f in genome.features:
if ontology_term in f.ontology_terms:
functions = f.ontology_terms[ontology_term]
for function in functions:
search_name_to_genes, search_name_to_orginal = {}, {}
for feature in genome.features:
if ontology_term in feature.ontology_terms:
for function in feature.ontology_terms[ontology_term]:
f_norm = normalize_role(function)
if f_norm not in search_name_to_genes:
search_name_to_genes[f_norm] = set()
search_name_to_orginal[f_norm] = set()
search_name_to_orginal[f_norm].add(function)
search_name_to_genes[f_norm].add(f.id)
search_name_to_genes[f_norm].add(feature.id)
return search_name_to_genes, search_name_to_orginal


Expand All @@ -281,10 +280,9 @@ def aux_template(template):

def build_gpr2(cpx_sets):
list_of_ors = []
for cpx in cpx_sets:
for cpx, role_ids in cpx_sets.items():
list_of_ands = []
for role_id in cpx_sets[cpx]:
gene_ors = cpx_sets[cpx][role_id]
for role_id, gene_ors in role_ids.items():
if len(gene_ors) > 1:
list_of_ands.append('(' + ' or '.join(gene_ors) + ')')
else:
Expand All @@ -294,7 +292,18 @@ def build_gpr2(cpx_sets):
return ' or '.join(list_of_ors)
return list_of_ors[0]

def build_gpr(cpx_gene_role):
def _reaction_sinks(self,model):
reactions_sinks = []
for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']:
if cpd_id in model.metabolites:
met = model.metabolites.get_by_id(cpd_id)
rxn_exchange = Reaction('SK_'+met.id, 'Sink for '+met.name, 'exchanges', 0, 1000)
rxn_exchange.add_metabolites({met: -1})
rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627"
reactions_sinks.append(rxn_exchange)
return reactions_sinks

def build_gpr(cpx_gene_role): #!!! Unused and redundant function
"""
example input:
{'sdh': [{'b0721': 'sdhC', 'b0722': 'sdhD', 'b0723': 'sdhA', 'b0724': 'sdhB'}]}
Expand Down Expand Up @@ -331,20 +340,17 @@ def __init__(self, genome, template=None):

def _get_template_reaction_complexes(self, template_reaction):
"""

:param template_reaction:
:return:
"""
template_reaction_complexes = {}
for cpx in template_reaction.get_complexes():
template_reaction_complexes[cpx.id] = {}
for role, (triggering, optional) in cpx.roles.items():
sn = normalize_role(role.name)
rn_norm = normalize_role(role.name)
template_reaction_complexes[cpx.id][role.id] = [
sn,
triggering,
optional,
set() if sn not in self.search_name_to_genes else set(self.search_name_to_genes[sn])
rn_norm, triggering, optional,
set() if rn_norm not in self.search_name_to_genes else set(self.search_name_to_genes[rn_norm])
]
return template_reaction_complexes

Expand All @@ -356,11 +362,11 @@ def _build_reaction_complex_gpr_sets2(match_complex, allow_incomplete_complexes=
roles = set()
role_genes = {}
for role_id in match_complex[cpx_id]:
t = match_complex[cpx_id][role_id]
complete &= len(t[3]) > 0 or not t[1] or t[2]
if len(t[3]) > 0:
complx = match_complex[cpx_id][role_id]
complete &= len(complx[3]) > 0 or not complx[1] or complx[2]
if len(complx[3]) > 0:
roles.add(role_id)
role_genes[role_id] = t[3]
role_genes[role_id] = complx[3]
#print(cpx_id, complete, roles)
if len(roles) > 0 and (allow_incomplete_complexes or complete):
complexes[cpx_id] = {}
Expand All @@ -370,19 +376,19 @@ def _build_reaction_complex_gpr_sets2(match_complex, allow_incomplete_complexes=
return complexes

@staticmethod
def _build_reaction_complex_gpr_sets(match_complex, allow_incomplete_complexes=True):
def _build_reaction_complex_gpr_sets(match_complex, allow_incomplete_complexes=True): #!!! Unused and redundant function
complexes = {}
for cpx_id in match_complex:
complete = True
roles = set()
role_genes = {}
for role_id in match_complex[cpx_id]:
t = match_complex[cpx_id][role_id]
complete &= len(t[3]) > 0 or not t[1] or t[2] # true if has genes or is not triggering or is optional
complx = match_complex[cpx_id][role_id]
complete &= len(complx[3]) > 0 or not complx[1] or complx[2] # true if has genes or is not triggering or is optional
# print(t[3])
if len(t[3]) > 0:
if len(complx[3]) > 0:
roles.add(role_id)
role_genes[role_id] = t[3]
role_genes[role_id] = complx[3]
# print(t)
# it is never complete if has no genes, only needed if assuming a complex can have all
# roles be either non triggering or optional
Expand All @@ -404,38 +410,30 @@ def get_gpr_from_template_reaction(self, template_reaction, allow_incomplete_com
template_reaction_complexes = self._get_template_reaction_complexes(template_reaction)
if len(template_reaction_complexes) == 0:
return None

# self.map_gene(template_reaction_complexes)
# print(template_reaction_complexes)
gpr_set = self._build_reaction_complex_gpr_sets2(template_reaction_complexes, allow_incomplete_complexes)
return gpr_set

@staticmethod
def _build_reaction(reaction_id, gpr_set, template, index='0', sbo=None):
template_reaction = template.reactions.get_by_id(reaction_id)

reaction_compartment = template_reaction.compartment
metabolites = {}

for cpd, value in template_reaction.metabolites.items():
for cpd, stoich in template_reaction.metabolites.items():
compartment = f"{cpd.compartment}{index}"
name = f"{cpd.name}_{compartment}"
cpd = Metabolite(cpd.id + str(index), cpd.formula, name, cpd.charge, compartment)
metabolites[cpd] = value
met = Metabolite(cpd.id+index, cpd.formula, f"{cpd.name}_{compartment}", cpd.charge, compartment)
metabolites[met] = stoich

reaction = Reaction(
"{}{}".format(template_reaction.id, index),
"{}_{}{}".format(template_reaction.name, reaction_compartment, index),
'',
template_reaction.lower_bound, template_reaction.upper_bound
)
reaction = Reaction(template_reaction.id+index,
f'{template_reaction.name}_{reaction_compartment}{index}', '',
template_reaction.lower_bound, template_reaction.upper_bound)

gpr_str = build_gpr2(gpr_set) if gpr_set else ''
reaction.add_metabolites(metabolites)
reaction.annotation["seed.reaction"] = template_reaction.reference_id
if gpr_str and len(gpr_str) > 0:
reaction.gene_reaction_rule = gpr_str # get_gpr_string(gpr_ll)

reaction.annotation["seed.reaction"] = template_reaction.reference_id
if sbo:
reaction.annotation[SBO_ANNOTATION] = sbo
return reaction
Expand All @@ -449,29 +447,28 @@ def build_exchanges(model, extra_cell='e0'):
:return:
"""
reactions_exchanges = []
for m in model.metabolites:
if m.compartment == extra_cell:
rxn_exchange_id = 'EX_' + m.id
for met in model.metabolites:
if met.compartment == extra_cell:
rxn_exchange_id = 'EX_' + met.id
if rxn_exchange_id not in model.reactions:
rxn_exchange = Reaction(rxn_exchange_id, 'Exchange for ' + m.name, 'exchanges', -1000, 1000)
rxn_exchange.add_metabolites({m: -1})
rxn_exchange = Reaction(rxn_exchange_id, 'Exchange for '+met.name, 'exchanges', -1000, 1000)
rxn_exchange.add_metabolites({met: -1})
rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627"
reactions_exchanges.append(rxn_exchange)
model.add_reactions(reactions_exchanges)

return reactions_exchanges

@staticmethod
def build_biomasses(model, template, index):
res = []
biomass_reactions = []
if template.name.startswith('CoreModel'):
res.append(build_biomass('bio1', model, template, core_biomass, index))
res.append(build_biomass('bio2', model, template, core_atp, index))
biomass_reactions.append(build_biomass('bio1', model, template, core_biomass, index))
biomass_reactions.append(build_biomass('bio2', model, template, core_atp, index))
if template.name.startswith('GramNeg'):
res.append(build_biomass('bio1', model, template, gramneg, index))
biomass_reactions.append(build_biomass('bio1', model, template, gramneg, index))
if template.name.startswith('GramPos'):
res.append(build_biomass('bio1', model, template, grampos, index))
return res
biomass_reactions.append(build_biomass('bio1', model, template, grampos, index))
return biomass_reactions

def auto_select_template(self):
"""
Expand All @@ -480,9 +477,7 @@ def auto_select_template(self):
"""
from modelseedpy.helpers import get_template, get_classifier
from modelseedpy.core.mstemplate import MSTemplateBuilder
genome_classifier = get_classifier('knn_ACNP_RAST_filter')
genome_class = genome_classifier.classify(self.genome)

genome_class = get_classifier('knn_ACNP_RAST_filter').classify(self.genome)
template_genome_scale_map = {
'A': 'template_gram_neg',
'C': 'template_gram_neg',
Expand All @@ -499,8 +494,7 @@ def auto_select_template(self):
if genome_class in template_genome_scale_map and genome_class in template_core_map:
self.template = MSTemplateBuilder.from_dict(get_template(template_genome_scale_map[genome_class])).build()
elif self.template is None:
raise Exception(f'unable to select template for {genome_class}')

raise Exception(f'A template is not defined for the genome class {genome_class}.')
return genome_class

def build_metabolic_reactions(self, index='0', allow_incomplete_complexes=True):
Expand All @@ -511,58 +505,42 @@ def build_metabolic_reactions(self, index='0', allow_incomplete_complexes=True):
metabolic_reactions[template_reaction.id] = gpr_set
logger.debug("[%s] gpr set: %s", template_reaction.id, gpr_set)

reactions = list(map(
lambda x: self._build_reaction(x[0], x[1], self.template, index, "SBO:0000176"),
metabolic_reactions.items()))

return reactions
return [self._build_reaction(key, val, self.template, index, "SBO:0000176")
for key, val in metabolic_reactions.items()]

def build_non_metabolite_reactions(self, cobra_model, index='0', allow_all_non_grp_reactions=False):
reactions_no_gpr = []
reactions_in_model = set(map(lambda x: x.id, cobra_model.reactions))
metabolites_in_model = set(map(lambda x: x.id, cobra_model.metabolites))
def build_non_metabolite_reactions(self, cobra_model, index='0', allow_all_non_grp_reactions=False): #!!! allow_all_non_grp_reactions in not used
reactions_sans_gpr = []
model_rxns = set(x.id for x in cobra_model.reactions)
model_mets = set(x.id for x in cobra_model.metabolites)
for rxn in self.template.reactions:
if rxn.type == 'universal' or rxn.type == 'spontaneous':
if rxn.type in ['universal', 'spontaneous']:
reaction = self._build_reaction(rxn.id, {}, self.template, index, "SBO:0000176")
reaction_metabolite_ids = set(map(lambda x: x.id, set(reaction.metabolites)))
if (len(metabolites_in_model & reaction_metabolite_ids) > 0 or allow_all_non_grp_reactions) and \
reaction.id not in reactions_in_model:
reaction.annotation["seed.reaction"] = rxn.id
reactions_no_gpr.append(reaction)
if len(model_mets.intersection(x.id for x in set(reaction.metabolites))) > 0:
if reaction.id not in model_rxns:
reaction.annotation["seed.reaction"] = rxn.id
reactions_sans_gpr.append(reaction)

return reactions_no_gpr
return reactions_sans_gpr

def build(self, model_id, index='0', allow_all_non_grp_reactions=False, annotate_with_rast=True):

if annotate_with_rast:
rast = RastClient()
res = rast.annotate_genome(self.genome)
res = rast.annotate_genome(self.genome) # !!! res is never used
self.search_name_to_genes, self.search_name_to_original = _aaaa(self.genome, 'RAST')

# rxn_roles = aux_template(self.template) # needs to be fixed to actually reflect template GPR rules
if self.template is None:
self.auto_select_template()

# construct the model
cobra_model = Model(model_id)
cobra_model.add_reactions(self.build_metabolic_reactions(index=index))
cobra_model.add_reactions(self.build_non_metabolite_reactions(cobra_model, index, allow_all_non_grp_reactions))
self.build_exchanges(cobra_model)

if self.template.name.startswith('CoreModel') or \
self.template.name.startswith('GramNeg') or self.template.name.startswith('GramPos'):
if any([self.template.name.startswith(x) for x in ('GramPos', 'CoreModel', 'GramNeg')]):
cobra_model.add_reactions(self.build_biomasses(cobra_model, self.template, index))
cobra_model.objective = 'bio1'

reactions_sinks = []
for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']:
if cpd_id in cobra_model.metabolites:
m = cobra_model.metabolites.get_by_id(cpd_id)
rxn_exchange = Reaction('SK_' + m.id, 'Sink for ' + m.name, 'exchanges', 0, 1000)
rxn_exchange.add_metabolites({m: -1})
rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627"
reactions_sinks.append(rxn_exchange)
cobra_model.add_reactions(reactions_sinks)

cobra_model.add_reactions(_reaction_sinks(cobra_model))
return cobra_model

@staticmethod
Expand All @@ -574,7 +552,7 @@ def build_full_template_model(template, model_id=None, index='0'):
:param index: index for the metabolites
:return:
"""
model = Model(model_id if model_id else template.id)
model = Model(model_id or template.id)
all_reactions = []
for rxn in template.reactions:
reaction = MSBuilder._build_reaction(rxn.id, {}, template, index, "SBO:0000176")
Expand All @@ -588,40 +566,30 @@ def build_full_template_model(template, model_id=None, index='0'):
bio_rxn2 = build_biomass('bio2', model, template, core_atp, index)
model.add_reactions([bio_rxn1, bio_rxn2])
model.objective = 'bio1'
if template.name.startswith('GramNeg'):
elif template.name.startswith('GramNeg'):
bio_rxn1 = build_biomass('bio1', model, template, gramneg, index)
model.add_reactions([bio_rxn1])
model.objective = 'bio1'
if template.name.startswith('GramPos'):
elif template.name.startswith('GramPos'):
bio_rxn1 = build_biomass('bio1', model, template, grampos, index)
model.add_reactions([bio_rxn1])
model.objective = 'bio1'

reactions_sinks = []
for cpd_id in ['cpd02701_c0', 'cpd11416_c0', 'cpd15302_c0']:
if cpd_id in model.metabolites:
m = model.metabolites.get_by_id(cpd_id)
rxn_exchange = Reaction('SK_' + m.id, 'Sink for ' + m.name, 'exchanges', 0, 1000)
rxn_exchange.add_metabolites({m: -1})
rxn_exchange.annotation[SBO_ANNOTATION] = "SBO:0000627"
reactions_sinks.append(rxn_exchange)
model.add_reactions(reactions_sinks)
model.add_reactions(_reaction_sinks(model))
return model

@staticmethod
def build_metabolic_model(model_id, genome, gapfill_media=None, template=None, index='0',
allow_all_non_grp_reactions=False, annotate_with_rast=True, gapfill_model=True):
builder = MSBuilder(genome, template)
model = builder.build(model_id, index, allow_all_non_grp_reactions, annotate_with_rast)
# Gapfilling model
if gapfill_model:
model = MSBuilder.gapfill_model(model, 'bio1', builder.template, gapfill_media)
return model

@staticmethod
def gapfill_model(original_mdl, target_reaction, template, media):
FBAHelper.set_objective_from_target_reaction(original_mdl, target_reaction)
model = cobra.io.json.from_json(cobra.io.json.to_json(original_mdl))
model = cobra.io.json.from_json(cobra.io.json.to_json(original_mdl)) #!!! what is the benefit of this I/O processing?
pkgmgr = MSPackageManager.get_pkg_mgr(model)
pkgmgr.getpkg("GapfillingPkg").build_package({
"default_gapfill_templates": [template],
Expand All @@ -632,7 +600,7 @@ def gapfill_model(original_mdl, target_reaction, template, media):
pkgmgr.getpkg("KBaseMediaPkg").build_package(media)
#with open('Gapfilling.lp', 'w') as out:
# out.write(str(model.solver))
sol = model.optimize()
sol = model.optimize() # !!! sol is never used
gfresults = pkgmgr.getpkg("GapfillingPkg").compute_gapfilled_solution()
for rxnid in gfresults["reversed"]:
rxn = original_mdl.reactions.get_by_id(rxnid)
Expand All @@ -650,4 +618,4 @@ def gapfill_model(original_mdl, target_reaction, template, media):
else:
rxn.upper_bound = 0
rxn.lower_bound = -100
return original_mdl
return original_mdl