From 250f2c7a5781ea92b0fb8f5381bfa56dde8ab9b4 Mon Sep 17 00:00:00 2001 From: jakeross Date: Fri, 27 Mar 2026 09:59:42 -0600 Subject: [PATCH] feat: enhance GCS upload handling with async support and improved error logging --- api/asset.py | 4 +++- services/gcs_helper.py | 47 ++++++++++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/api/asset.py b/api/asset.py index 90afdf9c..f52122bc 100644 --- a/api/asset.py +++ b/api/asset.py @@ -18,6 +18,7 @@ from fastapi_pagination.ext.sqlalchemy import paginate from sqlalchemy import select from sqlalchemy.exc import ProgrammingError +from starlette.concurrency import run_in_threadpool from starlette.status import HTTP_201_CREATED, HTTP_409_CONFLICT, HTTP_204_NO_CONTENT from api.pagination import CustomPage @@ -84,7 +85,8 @@ async def upload_asset( ) -> dict: from services.gcs_helper import gcs_upload - uri, blob_name = gcs_upload(file, bucket) + # GCS client calls are synchronous and can block for large uploads. + uri, blob_name = await run_in_threadpool(gcs_upload, file, bucket) return { "uri": uri, "storage_path": blob_name, diff --git a/services/gcs_helper.py b/services/gcs_helper.py index 237af5cb..55f2cdf2 100644 --- a/services/gcs_helper.py +++ b/services/gcs_helper.py @@ -16,7 +16,9 @@ import base64 import datetime import json +import logging import os +from functools import lru_cache from hashlib import md5 from fastapi import UploadFile @@ -27,8 +29,12 @@ GCS_BUCKET_NAME = os.environ.get("GCS_BUCKET_NAME") GCS_BUCKET_BASE_URL = f"https://storage.cloud.google.com/{GCS_BUCKET_NAME}/uploads" +GCS_LOOKUP_TIMEOUT_SECS = float(os.environ.get("GCS_LOOKUP_TIMEOUT_SECS", "15")) +GCS_UPLOAD_TIMEOUT_SECS = float(os.environ.get("GCS_UPLOAD_TIMEOUT_SECS", "120")) +logger = logging.getLogger(__name__) +@lru_cache(maxsize=1) def get_storage_client(): from google.cloud import storage from google.oauth2 import service_account @@ -51,14 +57,16 @@ def get_storage_client(): return client -def get_storage_bucket(client=None, bucket: str = None): - if client is None: - client = get_storage_client() +@lru_cache(maxsize=8) +def _get_cached_bucket(bucket_name: str): + return get_storage_client().bucket(bucket_name) - if bucket is None: - bucket = GCS_BUCKET_NAME - return client.bucket(bucket) +def get_storage_bucket(client=None, bucket: str = None): + bucket_name = bucket or GCS_BUCKET_NAME + if client is not None: + return client.bucket(bucket_name) + return _get_cached_bucket(bucket_name) def make_blob_name_and_uri(file): @@ -78,12 +86,16 @@ def gcs_upload(file: UploadFile, bucket=None): file.file.seek(0) blob_name, uri = make_blob_name_and_uri(file) - eblob = bucket.get_blob(blob_name) + eblob = bucket.get_blob(blob_name, timeout=GCS_LOOKUP_TIMEOUT_SECS) if not eblob: blob = bucket.blob(blob_name) file.file.seek(0) - blob.upload_from_file(file.file, content_type=file.content_type) + blob.upload_from_file( + file.file, + content_type=file.content_type, + timeout=GCS_UPLOAD_TIMEOUT_SECS, + ) return uri, blob_name @@ -93,11 +105,20 @@ def gcs_remove(uri: str, bucket): def add_signed_url(asset: Asset, bucket): - asset.signed_url = bucket.blob(asset.storage_path).generate_signed_url( - version="v4", - expiration=datetime.timedelta(minutes=15), - method="GET", - ) + try: + asset.signed_url = bucket.blob(asset.storage_path).generate_signed_url( + version="v4", + expiration=datetime.timedelta(minutes=15), + method="GET", + ) + except Exception: + logger.warning( + "Failed to generate signed URL for asset_id=%s storage_path=%s", + getattr(asset, "id", None), + getattr(asset, "storage_path", None), + exc_info=True, + ) + asset.signed_url = None return asset