Open
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a Celery beat (cron) job to detect and requeue “missing” frames by querying the archive API, and refactors archive querying / queue routing to reduce duplication.
Changes:
- Introduces a new
celery.requeue_missing_framesperiodic task and wires it into the existing cron container/entrypoint. - Adds a shared
banzai.querymodule with retrying archive GET + archive frame pagination helpers. - Refactors queue selection into
get_processing_queue()and reuses archive query helper in FITS downloading / BPM ingestion.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Bumps lco-banzai to 1.36.0 and updates lock metadata. |
| pyproject.toml | Updates version and switches console script to new cron entrypoint. |
| helm-chart/banzai/templates/listener.yaml | Updates the cron container command to banzai_cron. |
| CHANGES.md | Adds 1.36.0 changelog entry for daily requeueing. |
| banzai/utils/observation_utils.py | Adds tenacity retry to calibration-block archive query. |
| banzai/utils/instrument_utils.py | Adds shared get_processing_queue() helper. |
| banzai/utils/fits_utils.py | Replaces direct requests.get with archive_get helper for downloads. |
| banzai/settings.py | Adds requeue cron configuration knobs. |
| banzai/scheduling.py | Adds the requeue_missing_frames Celery task and archive querying usage. |
| banzai/query.py | New module for archive querying helpers and cross-matching. |
| banzai/main.py | Adds new cron entrypoint scheduling and reuses archive_get / get_processing_queue. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| REFERENCE_CATALOG_URL = os.getenv('REFERENCE_CATALOG_URL', 'http://phot-catalog.lco.gtn/') | ||
|
|
||
| REQUEUE_MISSING_FRAMES_TIME = datetime.datetime(0, 0, 0, hour=14, minute=30) |
| @@ -0,0 +1,69 @@ | |||
| from tenacity import retry, wait_exponential, stop_after_attempt | |||
| import requests | |||
| from banzai.utils.logs import get_logger | |||
| logger.error(message, | ||
| extra_tags={'attempt_number': archive_get.statistics['attempt_number']} | ||
| ) | ||
| response.raise_for_status() |
|
|
||
|
|
||
| def frames_from_archive(start, end, obstype, reduction_level, runtime_context, raw=False, related_frames=False): | ||
| archive_params = {'OBSTYPE': obstype, 'reduction_level': reduction_level, 'related_frames': related_frames} |
Comment on lines
+266
to
+274
| start = datetime.datetime.now(timezone.utc) - datetime.timedelta(hours=runtime_context.REQUEUE_LOOKBACK_HOURS) | ||
| end = datetime.datetime.now(timezone.utc) | ||
| raw_frames += query.from_archive(start, end, obstype, 0, runtime_context, related_frames=False) | ||
| # Get the reduced frames | ||
| reduced_frames += query.from_archive(start, end, obstype, runtime_context.reduction_level, runtime_context, related_frames=True) | ||
|
|
||
| # cross match to find any that are missing | ||
| missing_frames = cross_match_missing_frames(raw_frames, reduced_frames) | ||
|
|
Comment on lines
+274
to
+275
|
|
||
| with dbs.get_session(os.environ['DB_ADDRESS']) as db_session: |
| archive_auth_header = settings.ARCHIVE_AUTH_HEADER | ||
| response = requests.get(url, headers=archive_auth_header) | ||
| response.raise_for_status() | ||
| response = archive_get(url, params={}, headers=archive_auth_header) |
Comment on lines
86
to
96
| if is_raw_frame: | ||
| url = f'{context.RAW_DATA_FRAME_URL}/{frame_id}/?include_related_frames=false' | ||
| archive_auth_header = context.RAW_DATA_AUTH_HEADER | ||
| else: | ||
| url = f'{context.ARCHIVE_FRAME_URL}/{frame_id}/?include_related_frames=false' | ||
| archive_auth_header = context.ARCHIVE_AUTH_HEADER | ||
| logger.info(f"Requesting archive URL {url} (auth header present: {bool(archive_auth_header)})") | ||
|
|
||
| try: | ||
| response = requests.get(url, headers=archive_auth_header, timeout=30) | ||
| response.raise_for_status() | ||
| except requests.exceptions.HTTPError: | ||
| message = 'Error downloading file from archive.' | ||
| if int(response.status_code) == 429: | ||
| message += ' Rate limited.' | ||
| logger.error( | ||
| message, | ||
| extra_tags={ | ||
| 'filename': file_info.get('filename'), | ||
| 'attempt_number': download_from_s3.statistics['attempt_number'] | ||
| } | ||
| ) | ||
| raise | ||
| except requests.exceptions.RequestException as e: | ||
| message = "Archive download connection error." | ||
| logger.error( | ||
| f"{message} {e}", | ||
| extra_tags={ | ||
| 'filename': file_info.get('filename'), | ||
| 'attempt_number': download_from_s3.statistics['attempt_number'] | ||
| } | ||
| ) | ||
| raise | ||
| response = archive_get(url, params={'related_frames': False}, | ||
| auth_headers=archive_auth_header) | ||
|
|
Comment on lines
+106
to
+107
| response = archive_get(response_data['url'], params={}, | ||
| auth_headers=archive_auth_header, timeout=60) |
Comment on lines
+62
to
+66
| try: | ||
| queue_name = get_processing_queue(body, self.runtime_context) | ||
| except Exception: | ||
| message.ack() | ||
| return |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR adds a cron to requeue missing frames using the same cron container as the calibration scheduler. The query only relies on the archive api.
I also refactored some of the archive querying for better reuse of the retry logic and less code duplication.