From 65e51c66783fd4fc7a10a8adb291666d3b94b315 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 30 Mar 2026 14:33:44 -0600 Subject: [PATCH 1/2] feat: enhance logging with debug timing for various operations and add well export endpoint --- api/asset.py | 77 +++++++++++++++--- api/sample.py | 71 +++++++++++----- api/thing.py | 27 +++++- core/app.py | 61 +++++--------- db/engine.py | 60 ++++++++++++-- docker-compose.yml | 13 +-- entrypoint.sh | 7 +- schemas/well_export.py | 15 ++++ services/gcs_helper.py | 77 +++++++++++++++++- services/observation_helper.py | 31 ++++++- services/sample_helper.py | 118 ++++++++++++++++++++++++--- services/thing_helper.py | 27 ++++++ services/well_details_helper.py | 140 ++++++++++++++++++++++++++++++++ tests/test_asset.py | 121 ++++++++++++++++++++++++--- tests/test_request_timing.py | 33 ++++---- 15 files changed, 749 insertions(+), 129 deletions(-) create mode 100644 schemas/well_export.py diff --git a/api/asset.py b/api/asset.py index f52122bc8..b6555ab49 100644 --- a/api/asset.py +++ b/api/asset.py @@ -14,12 +14,19 @@ # limitations under the License. # =============================================================================== +import logging +import time + from fastapi import APIRouter, Depends, UploadFile, File from fastapi_pagination.ext.sqlalchemy import paginate from sqlalchemy import select from sqlalchemy.exc import ProgrammingError from starlette.concurrency import run_in_threadpool -from starlette.status import HTTP_201_CREATED, HTTP_409_CONFLICT, HTTP_204_NO_CONTENT +from starlette.status import ( + HTTP_201_CREATED, + HTTP_204_NO_CONTENT, + HTTP_409_CONFLICT, +) from api.pagination import CustomPage from core.dependencies import ( @@ -33,16 +40,38 @@ from schemas.asset import AssetResponse, CreateAsset, UpdateAsset from services.audit_helper import audit_add from services.crud_helper import model_patcher, model_deleter +from services.env import get_bool_env from services.exceptions_helper import PydanticStyleException from services.query_helper import simple_get_by_id router = APIRouter(prefix="/asset", tags=["asset"]) +logger = logging.getLogger(__name__) + + +def is_debug_timing_enabled() -> bool: + return bool(get_bool_env("API_DEBUG_TIMING", False)) def get_storage_bucket(): - from services.gcs_helper import get_storage_bucket as get_gcs_storage_bucket + from services.gcs_helper import ( + get_storage_bucket as get_gcs_storage_bucket, + ) - return get_gcs_storage_bucket() + started_at = time.perf_counter() + try: + return get_gcs_storage_bucket() + finally: + if is_debug_timing_enabled(): + logger.info( + "asset storage bucket resolved", + extra={ + "event": "asset_storage_bucket_resolved", + "bucket_resolution_ms": round( + (time.perf_counter() - started_at) * 1000, + 2, + ), + }, + ) def database_error_handler(payload: CreateAsset, error: ProgrammingError) -> None: @@ -53,8 +82,8 @@ def database_error_handler(payload: CreateAsset, error: ProgrammingError) -> Non error_message = error.orig.args[0]["M"] if ( - error_message - == 'null value in column "thing_id" of relation "asset_thing_association" violates not-null constraint' + error_message == 'null value in column "thing_id" of relation ' + '"asset_thing_association" violates not-null constraint' ): """ Developer's notes @@ -70,10 +99,13 @@ def database_error_handler(payload: CreateAsset, error: ProgrammingError) -> Non "input": {"thing_id": payload.thing_id}, } - raise PydanticStyleException(status_code=HTTP_409_CONFLICT, detail=[detail]) + raise PydanticStyleException( + status_code=HTTP_409_CONFLICT, + detail=[detail], + ) -# POST ========================================================================= +# POST ======================================================================= @router.post( "/upload", status_code=HTTP_201_CREATED, @@ -86,7 +118,21 @@ async def upload_asset( from services.gcs_helper import gcs_upload # GCS client calls are synchronous and can block for large uploads. + request_started_at = time.perf_counter() uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) + if is_debug_timing_enabled(): + logger.info( + "asset upload request completed", + extra={ + "event": "asset_upload_request_completed", + "filename": file.filename, + "content_type": file.content_type, + "upload_request_ms": round( + (time.perf_counter() - request_started_at) * 1000, + 2, + ), + }, + ) return { "uri": uri, "storage_path": blob_name, @@ -110,7 +156,11 @@ async def add_asset( # this storage path and thing_id from services.gcs_helper import check_asset_exists - existing_asset = check_asset_exists(session, storage_path, thing_id=thing_id) + existing_asset = check_asset_exists( + session, + storage_path, + thing_id=thing_id, + ) if existing_asset: # If an asset already exists, return it return existing_asset @@ -136,7 +186,7 @@ async def add_asset( database_error_handler(asset_data, e) -# GET ========================================================================== +# GET ======================================================================== """ Developer's notes @@ -189,11 +239,11 @@ async def get_asset( asset = simple_get_by_id(session, Asset, asset_id) - add_signed_url(asset, bucket) + asset = await run_in_threadpool(add_signed_url, asset, bucket) return asset -# PATCH ======================================================================== +# PATCH ====================================================================== @router.patch("/{asset_id}") async def update_asset( asset_id: int, @@ -207,7 +257,7 @@ async def update_asset( return model_patcher(session, Asset, asset_id, asset_data, user=user) -# DELETE ======================================================================= +# DELETE ===================================================================== @router.delete("/{asset_id}", status_code=HTTP_204_NO_CONTENT) @@ -215,7 +265,8 @@ async def delete_asset( asset_id: int, session: session_dependency, user: admin_dependency ): - # TODO: Interesting issue here. we don't have a way of tracking who deleted a record + # TODO: Interesting issue here. We don't have a way of tracking + # who deleted a record. return model_deleter(session, Asset, asset_id) diff --git a/api/sample.py b/api/sample.py index 22003a122..fdd471cb4 100644 --- a/api/sample.py +++ b/api/sample.py @@ -27,11 +27,13 @@ ) from db.sample import Sample from schemas import ResourceNotFoundResponse -from schemas.sample import SampleResponse, CreateSample, UpdateSample -from services.query_helper import simple_get_by_id +from schemas.sample import CreateSample, SampleResponse, UpdateSample from services.crud_helper import model_patcher, model_deleter, model_adder from services.exceptions_helper import PydanticStyleException -from services.sample_helper import get_db_samples +from services.sample_helper import ( + get_db_samples, + get_sample_by_id_with_relationships, +) router = APIRouter( prefix="/sample", @@ -42,44 +44,56 @@ # TODO: add the following database validation handlers # invalid sample_id # invalid lexicon terms -# sample_date of the Sample model cannot be before the event_date of the FieldEvent model +# sample_date of the Sample model cannot be before the +# event_date of the FieldEvent model def database_error_handler( - payload: CreateSample | UpdateSample, error: IntegrityError | ProgrammingError + payload: CreateSample | UpdateSample, + error: IntegrityError | ProgrammingError, ) -> None: """ Handle errors raised by the database when adding or updating a sample. """ error_message = error.orig.args[0]["M"] if ( - error_message - == 'duplicate key value violates unique constraint "sample_sample_name_key"' + error_message == "duplicate key value violates unique " + 'constraint "sample_sample_name_key"' ): detail = { "loc": ["body", "sample_name"], - "msg": f"Sample with sample_name {payload.sample_name} already exists.", + "msg": ( + f"Sample with sample_name {payload.sample_name} " "already exists." + ), "type": "value_error", "input": {"sample_name": payload.sample_name}, } elif ( error_message - == 'insert or update on table "sample" violates foreign key constraint "sample_field_activity_id_fkey"' + == 'insert or update on table "sample" violates foreign key constraint ' + '"sample_field_activity_id_fkey"' ): detail = { "loc": ["body", "field_activity_id"], - "msg": f"FieldActivity with ID {payload.field_activity_id} does not exist.", + "msg": ( + f"FieldActivity with ID {payload.field_activity_id} " "does not exist." + ), "type": "value_error", "input": {"field_activity_id": payload.field_activity_id}, } - raise PydanticStyleException(status_code=HTTP_409_CONFLICT, detail=[detail]) + raise PydanticStyleException( + status_code=HTTP_409_CONFLICT, + detail=[detail], + ) # ============= Post ============================================= @router.post("", status_code=HTTP_201_CREATED) async def add_sample( - sample_data: CreateSample, session: session_dependency, user: admin_dependency + sample_data: CreateSample, + session: session_dependency, + user: admin_dependency, ) -> SampleResponse: """ Endpoint to add a sample. @@ -106,7 +120,13 @@ async def update_sample( try: # since this is only one instance N+1 is not a concern for # FieldActivity, FieldEvent, and Thing - return model_patcher(session, Sample, sample_id, sample_data, user=user) + return model_patcher( + session, + Sample, + sample_id, + sample_data, + user=user, + ) except (IntegrityError, ProgrammingError) as e: database_error_handler(sample_data, e) @@ -124,27 +144,38 @@ async def get_samples( """ Endpoint to retrieve samples. """ - return get_db_samples(session, thing_id, sort=sort, order=order, filter_=filter_) + return get_db_samples( + session, + thing_id, + sort=sort, + order=order, + filter_=filter_, + ) @router.get("/{sample_id}", summary="Get Sample by ID") async def get_sample_by_id( - sample_id: int, session: session_dependency, user: viewer_dependency + sample_id: int, + session: session_dependency, + user: viewer_dependency, ) -> SampleResponse | ResourceNotFoundResponse: """ Endpoint to retrieve a sample by its ID. """ - # since this is only one instance N+1 is not a concern - # FieldActivity, FieldEvent, and Thing - return simple_get_by_id(session, Sample, sample_id) + return get_sample_by_id_with_relationships(session, sample_id) # ======= DELETE =============================================================== -@router.delete("/{sample_id}", summary="Delete Sample by ID") +@router.delete( + "/{sample_id}", + summary="Delete Sample by ID", +) async def delete_sample_by_id( - sample_id: int, session: session_dependency, user: admin_dependency + sample_id: int, + session: session_dependency, + user: admin_dependency, ) -> Response: return model_deleter(session, Sample, sample_id) diff --git a/api/thing.py b/api/thing.py index 4a8be334e..0babb871f 100644 --- a/api/thing.py +++ b/api/thing.py @@ -52,6 +52,7 @@ UpdateWellScreen, ) from schemas.well_details import WellDetailsResponse +from schemas.well_export import WellExportResponse from services.crud_helper import model_patcher, model_adder, model_deleter from services.exceptions_helper import PydanticStyleException from services.lexicon_helper import get_terms_by_category @@ -69,7 +70,10 @@ modify_well_descriptor_tables, WELL_DESCRIPTOR_MODEL_MAP, ) -from services.well_details_helper import get_well_details_payload +from services.well_details_helper import ( + get_well_details_payload, + get_well_export_payload, +) router = APIRouter(prefix="/thing", tags=["thing"]) @@ -201,6 +205,27 @@ async def get_well_details( ) +@router.get( + "/water-well/{thing_id}/export", + summary="Get water well export payload", + status_code=HTTP_200_OK, +) +async def get_well_export( + user: viewer_dependency, + thing_id: int, + session: session_dependency, + request: Request, +) -> WellExportResponse: + """ + Retrieve the minimal payload needed for field sheet export generation. + """ + return get_well_export_payload( + session=session, + request=request, + thing_id=thing_id, + ) + + @router.get( "/water-well/{thing_id}/well-screen", summary="Get well screens by water well ID", diff --git a/core/app.py b/core/app.py index 43fd705ac..02408ff67 100644 --- a/core/app.py +++ b/core/app.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import logging import os -import asyncio import time -import logging +from uuid import uuid4 from contextlib import asynccontextmanager from typing import AsyncGenerator @@ -44,7 +44,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: seed_all(10, skip_if_exists=True) app.state.instance_ready_at = time.perf_counter() - app.state.first_request_pending = True logger.info( "instance startup complete", extra={ @@ -69,55 +68,39 @@ def create_base_app() -> FastAPI: ) app.state.process_boot_started_at = time.perf_counter() app.state.instance_ready_at = None - app.state.first_request_pending = True - app.state.request_timing_lock = asyncio.Lock() @app.middleware("http") - async def log_request_timing(request: Request, call_next): - request_started_at = time.perf_counter() - async with app.state.request_timing_lock: - is_first_request = app.state.first_request_pending - app.state.first_request_pending = False + async def log_request_lifecycle(request: Request, call_next): + request_id = uuid4().hex + request.state.request_id = request_id + logger.info( + "request started %s %s", + request.method, + request.url.path, + extra={ + "event": "request_started", + "request_id": request_id, + "method": request.method, + "path": request.url.path, + }, + ) status_code = 500 try: response = await call_next(request) status_code = response.status_code return response finally: - request_duration_ms = round( - (time.perf_counter() - request_started_at) * 1000, 2 - ) - startup_ms = round( - ( - (app.state.instance_ready_at or request_started_at) - - app.state.process_boot_started_at - ) - * 1000, - 2, - ) - uptime_before_request_ms = round( - ( - ( - request_started_at - - (app.state.instance_ready_at or request_started_at) - ) - ) - * 1000, - 2, - ) - request_kind = "cold" if is_first_request else "warm" - logger.info( - "request timing", + "request completed %s %s status=%s", + request.method, + request.url.path, + status_code, extra={ - "event": "request_timing", - "request_kind": request_kind, + "event": "request_completed", + "request_id": request_id, "method": request.method, "path": request.url.path, "status_code": status_code, - "request_duration_ms": request_duration_ms, - "startup_ms": startup_ms, - "uptime_before_request_ms": uptime_before_request_ms, }, ) diff --git a/db/engine.py b/db/engine.py index eb8413065..c7a7721dc 100644 --- a/db/engine.py +++ b/db/engine.py @@ -16,13 +16,13 @@ import copy import getpass +import logging import os +import time from contextlib import contextmanager from dotenv import load_dotenv -from sqlalchemy import ( - create_engine, -) +from sqlalchemy import create_engine, event from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import ( sessionmaker, @@ -31,9 +31,49 @@ from services.env import get_bool_env -# Load .env file - don't override env vars already set (e.g., by test framework) +# Load .env file. Do not override env vars already set by the runtime. load_dotenv(override=False) driver = os.environ.get("DB_DRIVER", "") +logger = logging.getLogger(__name__) + + +def _install_pool_logging(engine): + @event.listens_for(engine, "checkout") + def log_checkout(dbapi_connection, connection_record, connection_proxy): + connection_record.info["checked_out_at"] = time.perf_counter() + logger.info( + "db pool checkout", + extra={ + "event": "db_pool_checkout", + "pool_status": engine.pool.status(), + }, + ) + + @event.listens_for(engine, "checkin") + def log_checkin(dbapi_connection, connection_record): + checked_out_at = connection_record.info.pop("checked_out_at", None) + hold_ms = None + if checked_out_at is not None: + hold_ms = round((time.perf_counter() - checked_out_at) * 1000, 2) + logger.info( + "db pool checkin", + extra={ + "event": "db_pool_checkin", + "connection_hold_ms": hold_ms, + "pool_status": engine.pool.status(), + }, + ) + + @event.listens_for(engine, "invalidate") + def log_invalidate(dbapi_connection, connection_record, exception): + logger.warning( + "db pool invalidate", + extra={ + "event": "db_pool_invalidate", + "pool_status": engine.pool.status(), + "exception_type": (type(exception).__name__ if exception else None), + }, + ) def get_iam_login_token() -> str: @@ -85,7 +125,11 @@ def asyncify_connection(): else: connect_kwargs["password"] = password - connection = connector.connect_async(instance_name, "asyncpg", **connect_kwargs) + connection = connector.connect_async( + instance_name, + "asyncpg", + **connect_kwargs, + ) return AsyncAdapt_asyncpg_connection( engine.dialect.dbapi, @@ -133,6 +177,7 @@ def getconn(): # Configure connection pool for parallel transfers pool_size = int(os.environ.get("DB_POOL_SIZE", "10")) max_overflow = int(os.environ.get("DB_MAX_OVERFLOW", "20")) + pool_timeout = int(os.environ.get("DB_POOL_TIMEOUT", "30")) engine = create_engine( "postgresql+pg8000://", @@ -140,8 +185,10 @@ def getconn(): echo=False, pool_size=pool_size, max_overflow=max_overflow, + pool_timeout=pool_timeout, pool_pre_ping=True, ) + _install_pool_logging(engine) return engine connector = Connector() @@ -172,6 +219,7 @@ def getconn(): # max_overflow: additional connections during peak usage pool_size = int(os.environ.get("DB_POOL_SIZE", "10")) max_overflow = int(os.environ.get("DB_MAX_OVERFLOW", "20")) + pool_timeout = int(os.environ.get("DB_POOL_TIMEOUT", "30")) engine = create_engine( url, @@ -179,8 +227,10 @@ def getconn(): plugins=["geoalchemy2"], pool_size=pool_size, max_overflow=max_overflow, + pool_timeout=pool_timeout, pool_pre_ping=True, # Verify connections before use ) + _install_pool_logging(engine) async_engine = create_async_engine( url.replace("postgresql+pg8000", "postgresql+asyncpg"), diff --git a/docker-compose.yml b/docker-compose.yml index fcbd09f10..78120d761 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,16 +2,17 @@ services: db: - build: - context: . - dockerfile: ./docker/db/Dockerfile - platform: linux/amd64 + image: postgis/postgis:17-3.5 + # build: +# context: . +# dockerfile: ./docker/db/Dockerfile +# platform: linux/amd64 environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - POSTGRES_DB=ocotilloapi_dev ports: - - 5432:5432 + - "5432:5432" volumes: - postgres_data_dev:/var/lib/postgresql/data - ./docker/db/init:/docker-entrypoint-initdb.d:ro @@ -41,7 +42,7 @@ services: - PYGEOAPI_POSTGRES_PASSWORD=${POSTGRES_PASSWORD} - GOOGLE_APPLICATION_CREDENTIALS=/app/gcs_credentials.json ports: - - 8000:8000 + - "8000:8000" depends_on: db: condition: service_healthy # <-- wait for DB to be ready diff --git a/entrypoint.sh b/entrypoint.sh index 18e0badc9..c89c621c1 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -7,6 +7,7 @@ DB_NAME="${POSTGRES_DB:-postgres}" APP_MODULE="${APP_MODULE:-main:app}" APP_PORT="${APP_PORT:-8000}" RUN_MIGRATIONS="${RUN_MIGRATIONS:-true}" +UVICORN_RELOAD="${UVICORN_RELOAD:-false}" # Wait for PostgreSQL to be ready until PGPASSWORD="$POSTGRES_PASSWORD" pg_isready -h "$DB_HOST" -p "$DB_PORT" -U "$POSTGRES_USER" -d "$DB_NAME"; do @@ -21,4 +22,8 @@ if [ "$RUN_MIGRATIONS" = "true" ]; then fi echo "Starting the application..." -uvicorn "$APP_MODULE" --host 0.0.0.0 --port "$APP_PORT" --reload +if [ "$UVICORN_RELOAD" = "true" ]; then + uvicorn "$APP_MODULE" --host 0.0.0.0 --port "$APP_PORT" --reload +else + uvicorn "$APP_MODULE" --host 0.0.0.0 --port "$APP_PORT" +fi diff --git a/schemas/well_export.py b/schemas/well_export.py new file mode 100644 index 000000000..9259e3de7 --- /dev/null +++ b/schemas/well_export.py @@ -0,0 +1,15 @@ +from pydantic import BaseModel, ConfigDict, Field + +from schemas.contact import ContactResponse +from schemas.deployment import DeploymentResponse +from schemas.sensor import SensorResponse +from schemas.thing import WellResponse + + +class WellExportResponse(BaseModel): + model_config = ConfigDict(from_attributes=True) + + well: WellResponse + contacts: list[ContactResponse] = Field(default_factory=list) + sensors: list[SensorResponse] = Field(default_factory=list) + deployments: list[DeploymentResponse] = Field(default_factory=list) diff --git a/services/gcs_helper.py b/services/gcs_helper.py index 55f2cdf2d..b524f2115 100644 --- a/services/gcs_helper.py +++ b/services/gcs_helper.py @@ -18,6 +18,7 @@ import json import logging import os +import time from functools import lru_cache from hashlib import md5 @@ -26,12 +27,18 @@ from core.settings import settings from db import Asset, AssetThingAssociation +from services.env import get_bool_env GCS_BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME") GCS_BUCKET_BASE_URL = f"https://storage.cloud.google.com/{GCS_BUCKET_NAME}/uploads" GCS_LOOKUP_TIMEOUT_SECS = float(os.environ.get("GCS_LOOKUP_TIMEOUT_SECS", "15")) GCS_UPLOAD_TIMEOUT_SECS = float(os.environ.get("GCS_UPLOAD_TIMEOUT_SECS", "120")) logger = logging.getLogger(__name__) +HASH_CHUNK_SIZE = 1024 * 1024 + + +def is_debug_timing_enabled() -> bool: + return bool(get_bool_env("API_DEBUG_TIMING", False)) @lru_cache(maxsize=1) @@ -51,8 +58,8 @@ def get_storage_client(): # Create storage client client = storage.Client(credentials=creds) else: - # Use application default credentials (from ~/.config/gcloud/application_default_credentials.json) - # This will automatically use GOOGLE_APPLICATION_CREDENTIALS if set, or the default location + # Use application default credentials from gcloud or + # GOOGLE_APPLICATION_CREDENTIALS when present. client = storage.Client() return client @@ -69,33 +76,90 @@ def get_storage_bucket(client=None, bucket: str = None): return _get_cached_bucket(bucket_name) +def _log_stage(stage: str, started_at: float, **extra): + if not is_debug_timing_enabled(): + return + logger.info( + "gcs stage timing", + extra={ + "event": "gcs_stage_timing", + "stage": stage, + "duration_ms": round((time.perf_counter() - started_at) * 1000, 2), + **extra, + }, + ) + + +def _hash_file(file_obj) -> str: + hasher = md5() + while True: + chunk = file_obj.read(HASH_CHUNK_SIZE) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + def make_blob_name_and_uri(file): + started_at = time.perf_counter() head, extension = os.path.splitext(file.filename) - file_id = md5(file.file.read()).hexdigest() + file.file.seek(0) + file_id = _hash_file(file.file) + file.file.seek(0) blob_name = f"{head}_{file_id}{extension}" uri = f"{GCS_BUCKET_BASE_URL}/{blob_name}" + _log_stage( + "hash_file", + started_at, + filename=file.filename, + blob_name=blob_name, + ) return blob_name, uri def gcs_upload(file: UploadFile, bucket=None): + upload_started_at = time.perf_counter() if bucket is None: + bucket_started_at = time.perf_counter() bucket = get_storage_bucket() + _log_stage("resolve_bucket", bucket_started_at, filename=file.filename) # make file id from hash of file contents file.file.seek(0) blob_name, uri = make_blob_name_and_uri(file) + lookup_started_at = time.perf_counter() eblob = bucket.get_blob(blob_name, timeout=GCS_LOOKUP_TIMEOUT_SECS) + _log_stage( + "lookup_blob", + lookup_started_at, + filename=file.filename, + blob_name=blob_name, + blob_exists=eblob is not None, + ) if not eblob: blob = bucket.blob(blob_name) file.file.seek(0) + upload_blob_started_at = time.perf_counter() blob.upload_from_file( file.file, content_type=file.content_type, timeout=GCS_UPLOAD_TIMEOUT_SECS, ) + _log_stage( + "upload_blob", + upload_blob_started_at, + filename=file.filename, + blob_name=blob_name, + ) + _log_stage( + "upload_request_total", + upload_started_at, + filename=file.filename, + blob_name=blob_name, + ) return uri, blob_name @@ -105,12 +169,19 @@ def gcs_remove(uri: str, bucket): def add_signed_url(asset: Asset, bucket): + started_at = time.perf_counter() try: asset.signed_url = bucket.blob(asset.storage_path).generate_signed_url( version="v4", expiration=datetime.timedelta(minutes=15), method="GET", ) + _log_stage( + "generate_signed_url", + started_at, + asset_id=getattr(asset, "id", None), + storage_path=getattr(asset, "storage_path", None), + ) except Exception: logger.warning( "Failed to generate signed URL for asset_id=%s storage_path=%s", diff --git a/services/observation_helper.py b/services/observation_helper.py index c049d5b19..f99241db0 100644 --- a/services/observation_helper.py +++ b/services/observation_helper.py @@ -1,4 +1,6 @@ from datetime import datetime +import logging +import time from typing import List from fastapi import Request, Query @@ -24,8 +26,15 @@ GroundwaterLevelObservationResponse, ) from services.exceptions_helper import PydanticStyleException +from services.env import get_bool_env from services.query_helper import simple_get_by_id, order_sort_filter +logger = logging.getLogger(__name__) + + +def is_debug_timing_enabled() -> bool: + return bool(get_bool_env("API_DEBUG_TIMING", False)) + def get_activity_type_from_request(request: Request) -> str: path = request.url.path @@ -235,7 +244,27 @@ def get_observations( if not order: sql = sql.order_by(Observation.observation_datetime.desc()) - return paginate(query=sql, conn=session) + started_at = time.perf_counter() + page = paginate(query=sql, conn=session) + if is_debug_timing_enabled(): + duration_ms = round((time.perf_counter() - started_at) * 1000, 2) + logger.info( + "observation query completed path=%s thing_id=%s sensor_id=%s sample_id=%s duration_ms=%s", + request.url.path, + thing_id, + sensor_id, + sample_id, + duration_ms, + extra={ + "event": "observation_query_completed", + "path": request.url.path, + "thing_id": thing_id, + "sensor_id": sensor_id, + "sample_id": sample_id, + "duration_ms": duration_ms, + }, + ) + return page def verify_observed_property_corresponds_with_activity_type( diff --git a/services/sample_helper.py b/services/sample_helper.py index 20423de80..e6591ca24 100644 --- a/services/sample_helper.py +++ b/services/sample_helper.py @@ -1,9 +1,88 @@ -from sqlalchemy.orm import Session, joinedload +import logging +import time + +from fastapi import HTTPException from fastapi_pagination.ext.sqlalchemy import paginate +from sqlalchemy import select +from sqlalchemy.orm import Session, joinedload, selectinload +from starlette.status import HTTP_404_NOT_FOUND -from db import FieldEvent, FieldActivity, FieldEventParticipant, Sample +from db import ( + Contact, + FieldActivity, + FieldEvent, + FieldEventParticipant, + GroupThingAssociation, + LocationThingAssociation, + Sample, + Thing, + ThingAquiferAssociation, + ThingContactAssociation, +) from services.query_helper import order_sort_filter +logger = logging.getLogger(__name__) + + +THING_RESPONSE_BASE = ( + joinedload(Sample.field_activity) + .joinedload(FieldActivity.field_event) + .joinedload(FieldEvent.thing) +) + + +THING_RESPONSE_LOADER_OPTIONS = ( + THING_RESPONSE_BASE.selectinload(Thing.location_associations).selectinload( + LocationThingAssociation.location + ), + THING_RESPONSE_BASE.selectinload(Thing.well_purposes), + THING_RESPONSE_BASE.selectinload(Thing.well_casing_materials), + THING_RESPONSE_BASE.selectinload(Thing.links), + THING_RESPONSE_BASE.selectinload(Thing.measuring_points), + THING_RESPONSE_BASE.selectinload(Thing.monitoring_frequencies), + THING_RESPONSE_BASE.selectinload(Thing.aquifer_associations).selectinload( + ThingAquiferAssociation.aquifer_system + ), + THING_RESPONSE_BASE.selectinload(Thing.group_associations).selectinload( + GroupThingAssociation.group + ), + THING_RESPONSE_BASE.selectinload(Thing.notes), + THING_RESPONSE_BASE.selectinload(Thing.permission_history), + THING_RESPONSE_BASE.selectinload(Thing.data_provenance), + THING_RESPONSE_BASE.selectinload(Thing.status_history), +) + + +CONTACT_RESPONSE_BASE = selectinload(Sample.field_event_participant) +CONTACT_RESPONSE_PARTICIPANT = CONTACT_RESPONSE_BASE.selectinload( + FieldEventParticipant.participant +) +CONTACT_RESPONSE_THING_ASSOCIATIONS = CONTACT_RESPONSE_PARTICIPANT.selectinload( + Contact.thing_associations +) + + +CONTACT_RESPONSE_LOADER_OPTIONS = ( + CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.emails), + CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.phones), + CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.addresses), + CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.incomplete_nma_phones), + CONTACT_RESPONSE_THING_ASSOCIATIONS.selectinload(ThingContactAssociation.thing), + CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.notes), +) + + +SAMPLE_ACTIVITY_LOADER = joinedload(Sample.field_activity).joinedload( + FieldActivity.field_event +) + + +SAMPLE_LOADER_OPTIONS = ( + SAMPLE_ACTIVITY_LOADER, + *THING_RESPONSE_LOADER_OPTIONS, + *CONTACT_RESPONSE_LOADER_OPTIONS, +) + def get_db_samples( session: Session, @@ -12,15 +91,7 @@ def get_db_samples( sort: str | None = None, filter_: str | None = None, ): - query = session.query(Sample).options( - # Eagerly load related FieldActivity and FieldEvent to avoid N+1 problem - joinedload(Sample.field_activity) - .joinedload(FieldActivity.field_event) - .joinedload(FieldEvent.thing), - joinedload(Sample.field_event_participant).joinedload( - FieldEventParticipant.participant - ), # Eagerly load related Contact - ) + query = session.query(Sample).options(*SAMPLE_LOADER_OPTIONS) if thing_id: query = query.join(FieldActivity) @@ -30,3 +101,28 @@ def get_db_samples( query = order_sort_filter(query, Sample, sort, order, filter_) return paginate(query) + + +def get_sample_by_id_with_relationships(session: Session, sample_id: int) -> Sample: + started_at = time.perf_counter() + sql = select(Sample).where(Sample.id == sample_id) + sql = sql.options(*SAMPLE_LOADER_OPTIONS) + sample = session.execute(sql).unique().scalar_one_or_none() + if sample is None: + raise HTTPException( + status_code=HTTP_404_NOT_FOUND, + detail=f"Sample with ID {sample_id} not found.", + ) + + duration_ms = round((time.perf_counter() - started_at) * 1000, 2) + logger.info( + "sample lookup completed sample_id=%s duration_ms=%s", + sample_id, + duration_ms, + extra={ + "event": "sample_lookup_completed", + "sample_id": sample_id, + "duration_ms": duration_ms, + }, + ) + return sample diff --git a/services/thing_helper.py b/services/thing_helper.py index 221cb1214..5b32f54c9 100644 --- a/services/thing_helper.py +++ b/services/thing_helper.py @@ -14,6 +14,8 @@ # limitations under the License. # =============================================================================== from datetime import datetime +import logging +import time from zoneinfo import ZoneInfo from fastapi import Request, HTTPException @@ -43,9 +45,17 @@ from services.audit_helper import audit_add from services.crud_helper import model_patcher from services.exceptions_helper import PydanticStyleException +from services.env import get_bool_env from services.geospatial_helper import make_within_wkt from services.query_helper import make_query, order_sort_filter, simple_get_by_id +logger = logging.getLogger(__name__) + + +def is_debug_timing_enabled() -> bool: + return bool(get_bool_env("API_DEBUG_TIMING", False)) + + WELL_DESCRIPTOR_MODEL_MAP = { "well_purposes": (WellPurpose, "purpose"), "well_casing_materials": (WellCasingMaterial, "material"), @@ -160,6 +170,7 @@ def verify_thing_type_correspondence(thing: Thing, thing_type: str): def get_thing_of_a_thing_type_by_id(session: Session, request: Request, thing_id: int): + started_at = time.perf_counter() thing_type = get_thing_type_from_request(request) sql = select(Thing).where(Thing.id == thing_id) @@ -175,6 +186,22 @@ def get_thing_of_a_thing_type_by_id(session: Session, request: Request, thing_id ) verify_thing_type_correspondence(thing, thing_type) + if is_debug_timing_enabled(): + duration_ms = round((time.perf_counter() - started_at) * 1000, 2) + logger.info( + "thing lookup completed path=%s thing_id=%s thing_type=%s duration_ms=%s", + request.url.path, + thing_id, + thing_type, + duration_ms, + extra={ + "event": "thing_lookup_completed", + "path": request.url.path, + "thing_id": thing_id, + "thing_type": thing_type, + "duration_ms": duration_ms, + }, + ) return thing diff --git a/services/well_details_helper.py b/services/well_details_helper.py index 25f100880..7408d15a6 100644 --- a/services/well_details_helper.py +++ b/services/well_details_helper.py @@ -1,3 +1,6 @@ +import logging +import time + from sqlalchemy import select from sqlalchemy.orm import Session, joinedload, selectinload @@ -14,8 +17,35 @@ ThingContactAssociation, WellScreen, ) +from services.env import get_bool_env from services.thing_helper import get_thing_of_a_thing_type_by_id +logger = logging.getLogger(__name__) + + +def is_debug_timing_enabled() -> bool: + return bool(get_bool_env("API_DEBUG_TIMING", False)) + + +def _log_payload_stage(payload_name: str, stage: str, thing_id: int, started_at: float): + if not is_debug_timing_enabled(): + return + duration_ms = round((time.perf_counter() - started_at) * 1000, 2) + logger.info( + "%s stage=%s thing_id=%s duration_ms=%s", + payload_name, + stage, + thing_id, + duration_ms, + extra={ + "event": "well_payload_stage_timing", + "payload_name": payload_name, + "stage": stage, + "thing_id": thing_id, + "duration_ms": duration_ms, + }, + ) + def get_well_details_payload( session: Session, @@ -23,8 +53,12 @@ def get_well_details_payload( thing_id: int, recent_observation_limit: int = 100, ): + payload_started_at = time.perf_counter() + stage_started_at = time.perf_counter() well = get_thing_of_a_thing_type_by_id(session, request, thing_id) + _log_payload_stage("well_details", "load_well", thing_id, stage_started_at) + stage_started_at = time.perf_counter() contacts = session.scalars( select(Contact) .join(ThingContactAssociation) @@ -40,7 +74,9 @@ def get_well_details_payload( ) .order_by(Contact.id) ).all() + _log_payload_stage("well_details", "load_contacts", thing_id, stage_started_at) + stage_started_at = time.perf_counter() sensors = session.scalars( select(Sensor) .join(Deployment) @@ -48,27 +84,50 @@ def get_well_details_payload( .distinct() .order_by(Sensor.id) ).all() + _log_payload_stage("well_details", "load_sensors", thing_id, stage_started_at) + stage_started_at = time.perf_counter() deployments = session.scalars( select(Deployment) .where(Deployment.thing_id == well.id) .options(selectinload(Deployment.sensor)) .order_by(Deployment.installation_date.desc(), Deployment.id.desc()) ).all() + _log_payload_stage( + "well_details", + "load_deployments", + thing_id, + stage_started_at, + ) + stage_started_at = time.perf_counter() well_screens = session.scalars( select(WellScreen) .where(WellScreen.thing_id == well.id) .order_by(WellScreen.screen_depth_top.asc(), WellScreen.id.asc()) ).all() + _log_payload_stage( + "well_details", + "load_well_screens", + thing_id, + stage_started_at, + ) + stage_started_at = time.perf_counter() groundwater_parameter_id = ( session.query(Parameter) .filter(Parameter.parameter_name == "groundwater level") .one() .id ) + _log_payload_stage( + "well_details", + "resolve_groundwater_parameter", + thing_id, + stage_started_at, + ) + stage_started_at = time.perf_counter() recent_groundwater_level_observations = session.scalars( select(Observation) .join(Sample) @@ -82,10 +141,17 @@ def get_well_details_payload( .order_by(Observation.observation_datetime.desc(), Observation.id.desc()) .limit(recent_observation_limit) ).all() + _log_payload_stage( + "well_details", + "load_recent_groundwater_level_observations", + thing_id, + stage_started_at, + ) latest_field_event_sample = None if recent_groundwater_level_observations: latest_sample_id = recent_groundwater_level_observations[0].sample_id + stage_started_at = time.perf_counter() latest_field_event_sample = session.scalar( select(Sample) .where(Sample.id == latest_sample_id) @@ -98,6 +164,19 @@ def get_well_details_payload( ), ) ) + _log_payload_stage( + "well_details", + "load_latest_field_event_sample", + thing_id, + stage_started_at, + ) + + _log_payload_stage( + "well_details", + "payload_total", + thing_id, + payload_started_at, + ) return { "well": well, @@ -108,3 +187,64 @@ def get_well_details_payload( "recent_groundwater_level_observations": recent_groundwater_level_observations, "latest_field_event_sample": latest_field_event_sample, } + + +def get_well_export_payload( + session: Session, + request, + thing_id: int, +): + payload_started_at = time.perf_counter() + stage_started_at = time.perf_counter() + well = get_thing_of_a_thing_type_by_id(session, request, thing_id) + _log_payload_stage("well_export", "load_well", thing_id, stage_started_at) + + stage_started_at = time.perf_counter() + contacts = session.scalars( + select(Contact) + .join(ThingContactAssociation) + .where(ThingContactAssociation.thing_id == well.id) + .options( + selectinload(Contact.emails), + selectinload(Contact.phones), + selectinload(Contact.addresses), + selectinload(Contact.incomplete_nma_phones), + selectinload(Contact.thing_associations).selectinload( + ThingContactAssociation.thing + ), + ) + .order_by(Contact.id) + ).all() + _log_payload_stage("well_export", "load_contacts", thing_id, stage_started_at) + + stage_started_at = time.perf_counter() + sensors = session.scalars( + select(Sensor) + .join(Deployment) + .where(Deployment.thing_id == well.id) + .distinct() + .order_by(Sensor.id) + ).all() + _log_payload_stage("well_export", "load_sensors", thing_id, stage_started_at) + + stage_started_at = time.perf_counter() + deployments = session.scalars( + select(Deployment) + .where(Deployment.thing_id == well.id) + .options(selectinload(Deployment.sensor)) + .order_by(Deployment.installation_date.desc(), Deployment.id.desc()) + ).all() + _log_payload_stage( + "well_export", + "load_deployments", + thing_id, + stage_started_at, + ) + _log_payload_stage("well_export", "payload_total", thing_id, payload_started_at) + + return { + "well": well, + "contacts": contacts, + "sensors": sensors, + "deployments": deployments, + } diff --git a/tests/test_asset.py b/tests/test_asset.py index 008cade90..081fe580d 100644 --- a/tests/test_asset.py +++ b/tests/test_asset.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== +import io +import logging +import os from datetime import timezone from unittest.mock import patch @@ -23,6 +26,7 @@ from core.dependencies import viewer_function, admin_function, editor_function from db import Asset from schemas import DT_FMT +from services import gcs_helper from tests import ( client, cleanup_post_test, @@ -30,12 +34,18 @@ cleanup_patch_test, ) -# CLASSES, FIXTURES, AND FUNCTIONS ============================================= +# CLASSES, FIXTURES, AND FUNCTIONS =========================================== class MockBlob: + def __init__(self): + self.upload_calls = 0 + self.last_file_position = None + def upload_from_file(self, *args, **kwargs): - pass + self.upload_calls += 1 + if args: + self.last_file_position = args[0].tell() def generate_signed_url(self, *args, **kwargs): return "https://storage.googleapis.com/mock-bucket/mock-asset" @@ -47,11 +57,15 @@ def delete(self, *args, **kwargs): class MockStorageBucket: name = "mock-bucket" + def __init__(self, existing_blob=None): + self._blob = MockBlob() + self._existing_blob = existing_blob + def blob(self, *args, **kwargs): - return MockBlob() + return self._blob def get_blob(self, *args, **kwargs): - return None + return self._existing_blob def mock_storage_bucket(): @@ -77,7 +91,7 @@ def override_dependency_fixture(): app.dependency_overrides = {} -# POST & UPLOAD tests ========================================================== +# POST & UPLOAD tests ======================================================== def test_upload_asset(): @@ -94,6 +108,86 @@ def test_upload_asset(): assert "storage_path" in data +def test_gcs_upload_logs_stage_timings(caplog): + bucket = MockStorageBucket() + upload = type( + "UploadStub", + (), + { + "filename": "field-compilation.pdf", + "content_type": "application/pdf", + "file": io.BytesIO(b"pdf-bytes" * 2048), + }, + )() + + with patch.dict(os.environ, {"API_DEBUG_TIMING": "true"}): + with caplog.at_level(logging.INFO, logger="services.gcs_helper"): + uri, blob_name = gcs_helper.gcs_upload(upload, bucket) + + stage_logs = [ + record for record in caplog.records if record.msg == "gcs stage timing" + ] + + assert uri.endswith(blob_name) + assert {record.stage for record in stage_logs} >= { + "hash_file", + "lookup_blob", + "upload_blob", + "upload_request_total", + } + + +def test_gcs_upload_skips_existing_blob(): + existing_blob = object() + bucket = MockStorageBucket(existing_blob=existing_blob) + upload = type( + "UploadStub", + (), + { + "filename": "existing.pdf", + "content_type": "application/pdf", + "file": io.BytesIO(b"existing-pdf"), + }, + )() + + gcs_helper.gcs_upload(upload, bucket) + + assert bucket._blob.upload_calls == 0 + + +def test_make_blob_name_and_uri_rewinds_file_after_hashing(): + upload = type( + "UploadStub", + (), + { + "filename": "rewind.pdf", + "file": io.BytesIO(b"a" * (gcs_helper.HASH_CHUNK_SIZE + 5)), + }, + )() + + blob_name, uri = gcs_helper.make_blob_name_and_uri(upload) + + assert blob_name in uri + assert upload.file.tell() == 0 + + +def test_gcs_upload_rewinds_before_upload(): + bucket = MockStorageBucket() + upload = type( + "UploadStub", + (), + { + "filename": "rewind-before-upload.pdf", + "content_type": "application/pdf", + "file": io.BytesIO(b"b" * (gcs_helper.HASH_CHUNK_SIZE + 7)), + }, + )() + + gcs_helper.gcs_upload(upload, bucket) + + assert bucket._blob.last_file_position == 0 + + def test_add_asset(water_well_thing): payload = { "release_status": "draft", @@ -119,7 +213,7 @@ def test_add_asset(water_well_thing): assert data["storage_path"] == payload["storage_path"] assert data["mime_type"] == payload["mime_type"] assert data["size"] == payload["size"] - assert data["signed_url"] == None + assert data["signed_url"] is None cleanup_post_test(Asset, data["id"]) @@ -146,7 +240,7 @@ def test_add_asset_409_bad_thing_id(water_well_thing): assert data["detail"][0]["input"] == {"thing_id": bad_thing_id} -# GET tests ==================================================================== +# GET tests ================================================================== def test_get_assets(asset, asset_with_associated_thing): @@ -166,7 +260,7 @@ def test_get_assets(asset, asset_with_associated_thing): assert data["items"][0]["size"] == asset.size assert data["items"][0]["uri"] == asset.uri assert data["items"][0]["storage_service"] == asset.storage_service - assert data["items"][0]["signed_url"] == None + assert data["items"][0]["signed_url"] is None assert data["items"][1]["id"] == asset_with_associated_thing.id assert data["items"][1][ @@ -187,11 +281,14 @@ def test_get_assets(asset, asset_with_associated_thing): data["items"][1]["storage_service"] == asset_with_associated_thing.storage_service ) - assert data["items"][1]["signed_url"] == None + assert data["items"][1]["signed_url"] is None def test_get_assets_thing_id(asset_with_associated_thing, water_well_thing): - with patch("api.asset.get_storage_bucket", return_value=MockStorageBucket()): + with patch( + "api.asset.get_storage_bucket", + return_value=MockStorageBucket(), + ): query_parameters = {"thing_id": water_well_thing.id} response = client.get("/asset", params=query_parameters) assert response.status_code == 200 @@ -231,7 +328,7 @@ def test_get_asset_by_id_404_not_found(asset): assert data["detail"] == f"Asset with ID {bad_id} not found." -# PATCH tests ================================================================== +# PATCH tests ================================================================ def test_patch_asset(asset): @@ -260,7 +357,7 @@ def test_patch_asset_404_not_found(asset): assert data["detail"] == f"Asset with ID {bad_id} not found." -# DELETE tests ================================================================= +# DELETE tests =============================================================== def test_delete_asset(second_asset): diff --git a/tests/test_request_timing.py b/tests/test_request_timing.py index ae6b255ed..78e4a9f4f 100644 --- a/tests/test_request_timing.py +++ b/tests/test_request_timing.py @@ -5,7 +5,7 @@ from core.app import create_base_app -def test_request_timing_logs_cold_then_warm(caplog): +def test_request_lifecycle_logs_start_and_completion(caplog): app = create_base_app() @app.get("/ping") @@ -20,24 +20,23 @@ async def ping(): startup_logs = [ record for record in caplog.records if record.msg == "instance startup complete" ] - request_logs = [ - record for record in caplog.records if record.msg == "request timing" + request_started_logs = [ + record for record in caplog.records if record.msg == "request started" + ] + request_completed_logs = [ + record for record in caplog.records if record.msg == "request completed" ] - assert len(startup_logs) == 1 - assert len(request_logs) == 2 + assert len(request_started_logs) == 2 + assert len(request_completed_logs) == 2 assert startup_logs[0].event == "instance_startup_complete" assert startup_logs[0].startup_ms >= 0 - - assert request_logs[0].request_kind == "cold" - assert request_logs[0].path == "/ping" - assert request_logs[0].status_code == 200 - assert request_logs[0].request_duration_ms >= 0 - assert request_logs[0].startup_ms >= 0 - - assert request_logs[1].request_kind == "warm" - assert request_logs[1].path == "/ping" - assert request_logs[1].status_code == 200 - assert request_logs[1].request_duration_ms >= 0 - assert request_logs[1].uptime_before_request_ms >= 0 + assert request_started_logs[0].event == "request_started" + assert request_started_logs[0].request_id + assert request_started_logs[0].path == "/ping" + assert request_completed_logs[0].event == "request_completed" + assert request_completed_logs[0].request_id == request_started_logs[0].request_id + assert request_completed_logs[0].status_code == 200 + assert request_completed_logs[1].request_id == request_started_logs[1].request_id + assert request_completed_logs[1].status_code == 200 From 2ffbd273c7d0cab9dbca91d965ebe7f8ddcdd458 Mon Sep 17 00:00:00 2001 From: jross Date: Mon, 30 Mar 2026 14:49:40 -0600 Subject: [PATCH 2/2] feat: optimize logging for request and asset upload processes, and enhance debug timing functionality --- api/asset.py | 2 +- core/app.py | 9 ++------- db/engine.py | 10 ++++++++++ services/gcs_helper.py | 14 ++++++++------ services/sample_helper.py | 29 ++++++++--------------------- 5 files changed, 29 insertions(+), 35 deletions(-) diff --git a/api/asset.py b/api/asset.py index b6555ab49..456b5d3a7 100644 --- a/api/asset.py +++ b/api/asset.py @@ -125,7 +125,7 @@ async def upload_asset( "asset upload request completed", extra={ "event": "asset_upload_request_completed", - "filename": file.filename, + "upload_filename": file.filename, "content_type": file.content_type, "upload_request_ms": round( (time.perf_counter() - request_started_at) * 1000, diff --git a/core/app.py b/core/app.py index 02408ff67..102256d4f 100644 --- a/core/app.py +++ b/core/app.py @@ -74,9 +74,7 @@ async def log_request_lifecycle(request: Request, call_next): request_id = uuid4().hex request.state.request_id = request_id logger.info( - "request started %s %s", - request.method, - request.url.path, + "request started", extra={ "event": "request_started", "request_id": request_id, @@ -91,10 +89,7 @@ async def log_request_lifecycle(request: Request, call_next): return response finally: logger.info( - "request completed %s %s status=%s", - request.method, - request.url.path, - status_code, + "request completed", extra={ "event": "request_completed", "request_id": request_id, diff --git a/db/engine.py b/db/engine.py index c7a7721dc..2d2f0d9f9 100644 --- a/db/engine.py +++ b/db/engine.py @@ -37,7 +37,17 @@ logger = logging.getLogger(__name__) +def is_pool_logging_enabled() -> bool: + return bool( + get_bool_env("DB_POOL_LOGGING", False) + or get_bool_env("API_DEBUG_TIMING", False) + ) + + def _install_pool_logging(engine): + if not is_pool_logging_enabled(): + return + @event.listens_for(engine, "checkout") def log_checkout(dbapi_connection, connection_record, connection_proxy): connection_record.info["checked_out_at"] = time.perf_counter() diff --git a/services/gcs_helper.py b/services/gcs_helper.py index b524f2115..da9ce606d 100644 --- a/services/gcs_helper.py +++ b/services/gcs_helper.py @@ -79,14 +79,16 @@ def get_storage_bucket(client=None, bucket: str = None): def _log_stage(stage: str, started_at: float, **extra): if not is_debug_timing_enabled(): return + record_extra = { + "event": "gcs_stage_timing", + "stage": stage, + "duration_ms": round((time.perf_counter() - started_at) * 1000, 2), + } + if "filename" in extra: + record_extra["upload_filename"] = extra.pop("filename") logger.info( "gcs stage timing", - extra={ - "event": "gcs_stage_timing", - "stage": stage, - "duration_ms": round((time.perf_counter() - started_at) * 1000, 2), - **extra, - }, + extra={**record_extra, **extra}, ) diff --git a/services/sample_helper.py b/services/sample_helper.py index e6591ca24..0f25dd9a1 100644 --- a/services/sample_helper.py +++ b/services/sample_helper.py @@ -1,6 +1,3 @@ -import logging -import time - from fastapi import HTTPException from fastapi_pagination.ext.sqlalchemy import paginate from sqlalchemy import select @@ -21,9 +18,6 @@ ) from services.query_helper import order_sort_filter -logger = logging.getLogger(__name__) - - THING_RESPONSE_BASE = ( joinedload(Sample.field_activity) .joinedload(FieldActivity.field_event) @@ -60,6 +54,9 @@ CONTACT_RESPONSE_THING_ASSOCIATIONS = CONTACT_RESPONSE_PARTICIPANT.selectinload( Contact.thing_associations ) +CONTACT_RESPONSE_THING = CONTACT_RESPONSE_THING_ASSOCIATIONS.selectinload( + ThingContactAssociation.thing +) CONTACT_RESPONSE_LOADER_OPTIONS = ( @@ -67,7 +64,7 @@ CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.phones), CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.addresses), CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.incomplete_nma_phones), - CONTACT_RESPONSE_THING_ASSOCIATIONS.selectinload(ThingContactAssociation.thing), + CONTACT_RESPONSE_THING, CONTACT_RESPONSE_PARTICIPANT.selectinload(Contact.notes), ) @@ -103,8 +100,10 @@ def get_db_samples( return paginate(query) -def get_sample_by_id_with_relationships(session: Session, sample_id: int) -> Sample: - started_at = time.perf_counter() +def get_sample_by_id_with_relationships( + session: Session, + sample_id: int, +) -> Sample: sql = select(Sample).where(Sample.id == sample_id) sql = sql.options(*SAMPLE_LOADER_OPTIONS) sample = session.execute(sql).unique().scalar_one_or_none() @@ -113,16 +112,4 @@ def get_sample_by_id_with_relationships(session: Session, sample_id: int) -> Sam status_code=HTTP_404_NOT_FOUND, detail=f"Sample with ID {sample_id} not found.", ) - - duration_ms = round((time.perf_counter() - started_at) * 1000, 2) - logger.info( - "sample lookup completed sample_id=%s duration_ms=%s", - sample_id, - duration_ms, - extra={ - "event": "sample_lookup_completed", - "sample_id": sample_id, - "duration_ms": duration_ms, - }, - ) return sample