From 02a81a7991740d8f124845bd3846f8f417fbac7c Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Fri, 19 Dec 2025 17:53:27 -0500 Subject: [PATCH] Load conflations from stars.renci.org. This currently takes 30s for GeneProtein and 60ms for DrugChemical, but we should be able to speed this up quite a bit. --- api/server.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/api/server.py b/api/server.py index 1a2965c4..97111781 100755 --- a/api/server.py +++ b/api/server.py @@ -13,6 +13,7 @@ import warnings import os import re +from contextlib import asynccontextmanager from typing import Dict, List, Union, Annotated, Optional from fastapi import Body, FastAPI, Query @@ -25,11 +26,48 @@ SOLR_HOST = os.getenv("SOLR_HOST", "localhost") SOLR_PORT = os.getenv("SOLR_PORT", "8983") +BABEL_VERSION = os.getenv("BABEL_VERSION") +BABEL_DOWNLOAD_URL_BASE = os.getenv("BABEL_DOWNLOAD_URL_BASE", "https://stars.renci.org/var/babel_outputs/") +CONFLATIONS = os.getenv("CONFLATIONS", "GeneProtein,DrugChemical") -app = FastAPI(**get_app_info()) logger = logging.getLogger(__name__) logging.basicConfig(level=os.getenv("LOGLEVEL", logging.INFO)) +conflations = {} +@asynccontextmanager +async def lifespan(app: FastAPI): + # Initialization code. + for conflation_name in [conflation.strip() for conflation in CONFLATIONS.split(",")]: + # TODO: measure memory usage so we can tell how much holding conflations in memory is costing us. + conflations[conflation_name] = {} + start_time = time.time_ns() + + # Download the conflation + download_url = f"{BABEL_DOWNLOAD_URL_BASE}{BABEL_VERSION}/conflation/{conflation_name}.txt" + logger.info(f"Downloading conflation {conflation_name} from {download_url} as a text file.") + # TODO: download as a Gzip file (which will save us a whole bunch of time). + with httpx.stream("GET", download_url) as response: + response.raise_for_status() + + for line in response.iter_lines(): + # The simplest way to store the conflation information would be the same way we do it with NodeNorm: + # we store conflation[ConflationName][curie2] = [curie1, curie2, ...] + # Lots of duplication, but hopefully we can reuse the lists and it won't cost us too much. + row = json.loads(line) + for curie in row: + if curie in conflations[conflation_name]: + logger.warning(f"CURIE {curie} already loaded in conflation {conflation_name}, overwriting.") + conflations[conflation_name][curie] = row + + end_time = time.time_ns() + logger.info(f"Loaded {len(conflations[conflation_name].keys()):,} conflations for {conflation_name} in {(end_time - start_time)/1_000_000:,.2f} ms.") + + # yield so that the FastAPI app can start up. + yield + + # Cleanup code would go here if we had any. + +app = FastAPI(lifespan=lifespan, **get_app_info()) app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -71,7 +109,7 @@ async def status() -> Dict: result = response.json() # Do we know the Babel version and version URL? It will be stored in an environmental variable if we do. - babel_version = os.environ.get("BABEL_VERSION", "unknown") + babel_version = BABEL_VERSION babel_version_url = os.environ.get("BABEL_VERSION_URL", "") # We should have a status for name_lookup_shard1_replica_n1. @@ -82,6 +120,15 @@ async def status() -> Dict: if 'index' in core: index = core['index'] + conflation_information = {} + for conflation_name in conflations: + conflation_information[conflation_name] = { + 'distinct_curies': len(conflations[conflation_name].keys()), + # Since we reuse row objects in the conflation, we can count the distinct number of id()s to figure out + # the number of distinct conflations. + 'distinct_conflations': len({id(v) for v in conflations[conflation_name].values()}), + } + return { 'status': 'ok', 'message': 'Reporting results from primary core.', @@ -95,6 +142,7 @@ async def status() -> Dict: 'segmentCount': index.get('segmentCount', ''), 'lastModified': index.get('lastModified', ''), 'size': index.get('size', ''), + 'conflations': conflation_information, } else: return {